Skip to content

Commit 77e1eb2

Browse files
committed
registry
1 parent f92332b commit 77e1eb2

File tree

8 files changed

+197
-145
lines changed

8 files changed

+197
-145
lines changed

datafusion/src/datasource/datasource2.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,28 +43,23 @@ pub struct FilePartition {
4343
}
4444

4545
#[derive(Debug, Clone)]
46-
pub struct ParquetSourceDesc {
47-
partition_files: Vec<PartitionedFile>,
48-
schema: SchemaRef,
46+
pub struct SourceDescriptor {
47+
pub partition_files: Vec<PartitionedFile>,
48+
pub schema: SchemaRef,
4949
}
5050

51-
pub trait DataSource2<R>: Send + Sync
52-
where
53-
R: ChunkReader + 'static,
54-
{
51+
pub trait DataSource2<R: ChunkReader + 'static>: Send + Sync {
5552
fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
5653

5754
fn schema(&self) -> Result<Arc<Schema>>;
5855

5956
fn get_read_for_file(&self, partitioned_file: PartitionedFile) -> Result<R>;
6057

6158
fn statistics(&self) -> &Statistics;
59+
}
6260

63-
fn new_chunk_reader(file_path: &str) -> Result<R>;
64-
65-
fn get_all_files(root_path: &str) -> Result<Vec<String>>;
66-
67-
fn get_source_desc(root_path: &str) -> Result<ParquetSourceDesc> {
61+
pub trait SourceDescBuilder<R: ChunkReader + 'static> {
62+
fn get_source_desc(root_path: &str) -> Result<SourceDescriptor> {
6863
let filenames = Self::get_all_files(root_path)?;
6964
if filenames.is_empty() {
7065
return Err(DataFusionError::Plan(format!(
@@ -90,22 +85,31 @@ where
9085
// schema merging and this is a limitation.
9186
// See https://issues.apache.org/jira/browse/ARROW-11017
9287
return Err(DataFusionError::Plan(format!(
93-
"The Parquet file {} have different schema from the first file and DataFusion does \
88+
"The file {} have different schema from the first file and DataFusion does \
9489
not yet support schema merging",
9590
file_path
9691
)));
9792
}
9893
Ok(pf)
9994
}).collect::<Result<Vec<PartitionedFile>>>();
10095

101-
Ok(ParquetSourceDesc {
96+
Ok(SourceDescriptor {
10297
partition_files: partitioned_files?,
10398
schema: Arc::new(schemas.pop().unwrap()),
10499
})
105100
}
106101

107-
fn get_file_meta(file_path: &str) -> Result<PartitionedFile> {
108-
let chunk_reader = Self::new_chunk_reader(file_path)?;
102+
fn get_all_files(root_path: &str) -> Result<Vec<String>>;
103+
104+
fn get_file_meta(file_path: &str) -> Result<PartitionedFile>;
105+
106+
fn reader_for_file_meta(file_path: &str) -> Result<R>;
107+
}
108+
109+
pub trait ParquetSourceDescBuilder: SourceDescBuilder {
110+
111+
fn get_file_meta(file_path: &str) {
112+
let chunk_reader = Self::reader_for_file_meta(file_path)?;
109113
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
110114
let arrow_reader = ParquetFileArrowReader::new(file_reader);
111115
let file_path = file_path.to_string();

datafusion/src/datasource/local/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,16 @@ pub mod parquet_source;
1919

2020
use crate::error::DataFusionError;
2121
use crate::error::Result;
22+
use super::datasource2::DataSource2;
2223
use std::fs;
2324
use std::fs::metadata;
2425

26+
struct LocalFSHander {}
27+
28+
impl DataSource2 for LocalFSHander {
29+
30+
}
31+
2532
/// Recursively build a list of files in a directory with a given extension with an accumulator list
2633
fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
2734
let metadata = metadata(dir)?;

datafusion/src/datasource/local/parquet_source.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::datasource::datasource2::DataSource2;
19-
use crate::datasource::datasource2::ParquetSourceDesc;
18+
use crate::datasource::datasource2::SourceDescBuilder;
19+
use crate::datasource::datasource2::SourceDescriptor;
2020
use crate::error::Result;
2121
use std::fs::File;
2222

2323
struct LocalParquetSource {
24-
desc: ParquetSourceDesc,
24+
desc: SourceDescriptor,
2525
}
2626

2727
impl LocalParquetSource {
@@ -32,8 +32,8 @@ impl LocalParquetSource {
3232
}
3333
}
3434

35-
impl DataSource2<File> for LocalParquetSource {
36-
fn new_chunk_reader(file_path: &str) -> Result<File> {
35+
impl SourceDescBuilder<File> for LocalParquetSource {
36+
fn reader_for_file_meta(file_path: &str) -> Result<File> {
3737
Ok(File::open(file_path)?)
3838
}
3939

datafusion/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub mod json;
2525
pub mod memory;
2626
pub mod parquet;
2727
pub mod local;
28+
pub mod protocol_registry;
2829

2930
pub use self::csv::{CsvFile, CsvReadOptions};
3031
pub use self::datasource::{TableProvider, TableType};

datafusion/src/datasource/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl<R: ChunkReader + 'static> ParquetTable<R> {
4747
max_concurrency: usize,
4848
) -> Result<Self> {
4949
Ok(Self {
50-
source,
50+
source: Box::new(source),
5151
max_concurrency,
5252
enable_pruning: true,
5353
})
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::{Arc, RwLock};
20+
21+
use super::datasource2::DataSource2;
22+
23+
pub trait ProtocolHander<R: ChunkReader + 'static>: Sync + Send {
24+
/// Returns the protocol handler as [`Any`](std::any::Any)
25+
/// so that it can be downcast to a specific implementation.
26+
fn as_any(&self) -> &dyn Any;
27+
28+
fn source(&self,
29+
30+
) -> Result<Arc<dyn DataSource2>>;
31+
32+
fn list_all_files(&self, root_path: &str) -> Result<Vec<String>>;
33+
34+
fn get_reader(&self, file_path: &str) -> Result<R>;
35+
}
36+
37+
struct LocalFSHander {
38+
39+
}
40+
41+
impl ProtocolHander for LocalFSHander {
42+
43+
}
44+
45+
pub struct ProtocolRegistry {
46+
pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHander>>>,
47+
}
48+
49+
impl ProtocolRegistry {
50+
pub fn new() -> Self {
51+
Self {
52+
protocol_handlers: RwLock::new(HashMap::new()),
53+
}
54+
}
55+
56+
/// Adds a new handler to this registry.
57+
/// If a handler of the same prefix existed before, it is replaced in the registry and returned.
58+
pub fn register_handler(
59+
&self,
60+
prefix: &str,
61+
handler: Arc<dyn ProtocolHander>,
62+
) -> Option<Arc<dyn ProtocolHander>> {
63+
let mut handler = self.protocol_handlers.write().unwrap();
64+
handler.insert(prefix.to_string(), handler)
65+
}
66+
67+
pub fn handler(&self, prefix: &str) -> Option<Arc<dyn ProtocolHander>> {
68+
let handler = self.protocol_handlers.read().unwrap();
69+
handler.get(prefix).cloned()
70+
}
71+
}

datafusion/src/execution/context.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ impl ExecutionContext {
156156
.register_catalog(config.default_catalog.clone(), default_catalog);
157157
}
158158

159+
let protocol_registry = ProtocolRegistry::new();
160+
protocol_registry.register_handler("file", )
161+
159162
Self {
160163
state: Arc::new(Mutex::new(ExecutionContextState {
161164
catalog_list,
@@ -164,6 +167,7 @@ impl ExecutionContext {
164167
aggregate_functions: HashMap::new(),
165168
config,
166169
execution_props: ExecutionProps::new(),
170+
protocol_registry,
167171
})),
168172
}
169173
}
@@ -358,6 +362,20 @@ impl ExecutionContext {
358362
state.catalog_list.register_catalog(name, catalog)
359363
}
360364

365+
pub fn register_protocol_handler(
366+
&self,
367+
prefix: &str,
368+
handler: Arc<dyn ProtocolHander>,
369+
) -> Option<Arc<dyn ProtocolHander>> {
370+
let prefix = prefix.to_string();
371+
372+
self.state
373+
.lock()
374+
.unwrap()
375+
.protocol_registry
376+
.register_handler(prefix, handler)
377+
}
378+
361379
/// Retrieves a `CatalogProvider` instance by name
362380
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
363381
self.state.lock().unwrap().catalog_list.catalog(name)
@@ -840,6 +858,8 @@ pub struct ExecutionContextState {
840858
pub config: ExecutionConfig,
841859
/// Execution properties
842860
pub execution_props: ExecutionProps,
861+
/// Protocol handlers
862+
pub protocol_registry: ProtocolRegistry,
843863
}
844864

845865
impl ExecutionProps {
@@ -867,6 +887,7 @@ impl ExecutionContextState {
867887
aggregate_functions: HashMap::new(),
868888
config: ExecutionConfig::new(),
869889
execution_props: ExecutionProps::new(),
890+
protocol_registry: ProtocolRegistry::new(),
870891
}
871892
}
872893

0 commit comments

Comments
 (0)