-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add support for reading remote storage systems #811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
0821202
6f59715
5545ac7
b0a353c
42b6f43
9779395
9a8614b
90e4d88
af106a9
2c2650b
cf2c203
0596c9c
9869c69
908f445
a9f9a5e
7d894cc
f6239b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,7 +85,8 @@ use self::state::{ConfigBackendClient, SchedulerState}; | |
| use ballista_core::config::BallistaConfig; | ||
| use ballista_core::execution_plans::ShuffleWriterExec; | ||
| use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; | ||
| use datafusion::physical_plan::parquet::ParquetExec; | ||
| use ballista_core::utils::create_datafusion_context_concurrency; | ||
| use datafusion::datasource::parquet::ParquetRootDesc; | ||
| use datafusion::prelude::{ExecutionConfig, ExecutionContext}; | ||
| use std::time::{Instant, SystemTime, UNIX_EPOCH}; | ||
|
|
||
|
|
@@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer { | |
|
|
||
| match file_type { | ||
| FileType::Parquet => { | ||
| let parquet_exec = | ||
| ParquetExec::try_from_path(&path, None, None, 1024, 1, None) | ||
| .map_err(|e| { | ||
| let msg = format!("Error opening parquet files: {}", e); | ||
| error!("{}", msg); | ||
| tonic::Status::internal(msg) | ||
| })?; | ||
| let ctx = create_datafusion_context_concurrency(1); | ||
| let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| { | ||
| let msg = format!("Error opening parquet files: {}", e); | ||
| error!("{}", msg); | ||
| tonic::Status::internal(msg) | ||
| })?; | ||
|
|
||
| //TODO include statistics and any other info needed to reconstruct ParquetExec | ||
| Ok(Response::new(GetFileMetadataResult { | ||
| schema: Some(parquet_exec.schema().as_ref().into()), | ||
| partitions: parquet_exec | ||
| .partitions() | ||
| .iter() | ||
| .map(|part| FilePartitionMetadata { | ||
| filename: part.filenames().to_vec(), | ||
| }) | ||
| .collect(), | ||
| schema: Some(parquet_desc.schema().as_ref().into()), | ||
| partitions: vec![FilePartitionMetadata { | ||
| filename: vec![path], | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are always returning a single path for the partitions field? This changes the behavior doesn't it?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The behavior is unchanged indeed, the origin filenames all comes from the root_path, and here I just use the root_path instead, to avoid touching too much code in ballista proto definition as well as its serde (to_proto and from_proto) |
||
| }], | ||
| })) | ||
| } | ||
| //TODO implement for CSV | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,12 +40,14 @@ use std::string::String; | |
| use std::sync::{Arc, Mutex}; | ||
|
|
||
| use crate::datasource::datasource::Statistics; | ||
| use crate::datasource::local::LocalFileSystem; | ||
| use crate::datasource::object_store::ObjectStore; | ||
| use crate::datasource::{Source, TableProvider}; | ||
| use crate::error::{DataFusionError, Result}; | ||
| use crate::logical_plan::Expr; | ||
| use crate::physical_plan::csv::CsvExec; | ||
| pub use crate::physical_plan::csv::CsvReadOptions; | ||
| use crate::physical_plan::{common, ExecutionPlan}; | ||
| use crate::physical_plan::ExecutionPlan; | ||
|
|
||
| /// Represents a CSV file with a provided schema | ||
| pub struct CsvFile { | ||
|
|
@@ -64,7 +66,8 @@ impl CsvFile { | |
| let schema = Arc::new(match options.schema { | ||
| Some(s) => s.clone(), | ||
| None => { | ||
| let filenames = common::build_file_list(&path, options.file_extension)?; | ||
| let filenames = LocalFileSystem | ||
|
||
| .list_all_files(path.as_str(), options.file_extension)?; | ||
| if filenames.is_empty() { | ||
| return Err(DataFusionError::Plan(format!( | ||
| "No files found at {path} with file extension {file_extension}", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
yjshen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Object store that represents the Local File System. | ||
| use crate::datasource::object_store::{ObjectReader, ObjectStore}; | ||
| use crate::error::DataFusionError; | ||
| use crate::error::Result; | ||
| use crate::parquet::file::reader::Length; | ||
| use crate::parquet::file::serialized_reader::FileSource; | ||
| use std::any::Any; | ||
| use std::fs; | ||
| use std::fs::{metadata, File}; | ||
| use std::io::Read; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Debug)] | ||
| /// Local File System as Object Store. | ||
| pub struct LocalFileSystem; | ||
|
|
||
| impl ObjectStore for LocalFileSystem { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn list_all_files(&self, path: &str, ext: &str) -> Result<Vec<String>> { | ||
| list_all(path, ext) | ||
| } | ||
|
|
||
| fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>> { | ||
| let file = File::open(file_path)?; | ||
| let reader = LocalFSObjectReader::new(file)?; | ||
| Ok(Arc::new(reader)) | ||
| } | ||
| } | ||
|
|
||
| struct LocalFSObjectReader { | ||
| file: File, | ||
| } | ||
|
|
||
| impl LocalFSObjectReader { | ||
| fn new(file: File) -> Result<Self> { | ||
| Ok(Self { file }) | ||
| } | ||
| } | ||
|
|
||
| impl ObjectReader for LocalFSObjectReader { | ||
| fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read> { | ||
| Box::new(FileSource::<File>::new(&self.file, start, length)) | ||
| } | ||
|
|
||
| fn length(&self) -> u64 { | ||
| self.file.len() | ||
| } | ||
| } | ||
|
|
||
| fn list_all(root_path: &str, ext: &str) -> Result<Vec<String>> { | ||
| let mut filenames: Vec<String> = Vec::new(); | ||
| list_all_files(root_path, &mut filenames, ext)?; | ||
| Ok(filenames) | ||
| } | ||
|
|
||
| /// Recursively build a list of files in a directory with a given extension with an accumulator list | ||
| fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> { | ||
| let metadata = metadata(dir)?; | ||
| if metadata.is_file() { | ||
| if dir.ends_with(ext) { | ||
| filenames.push(dir.to_string()); | ||
| } | ||
| } else { | ||
| for entry in fs::read_dir(dir)? { | ||
| let entry = entry?; | ||
| let path = entry.path(); | ||
| if let Some(path_name) = path.to_str() { | ||
| if path.is_dir() { | ||
| list_all_files(path_name, filenames, ext)?; | ||
| } else if path_name.ends_with(ext) { | ||
| filenames.push(path_name.to_string()); | ||
| } | ||
| } else { | ||
| return Err(DataFusionError::Plan("Invalid path".to_string())); | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.