@@ -129,6 +129,7 @@ def save(
129
129
catalog_type : str = 'dict' ,
130
130
to_csv_kwargs : dict = None ,
131
131
json_dump_kwargs : dict = None ,
132
+ storage_options : typing .Dict [str , typing .Any ] = None ,
132
133
) -> None :
133
134
"""
134
135
Save the catalog to a file.
@@ -138,14 +139,18 @@ def save(
138
139
name: str
139
140
The name of the file to save the catalog to.
140
141
directory: str
141
- The directory to save the catalog to. If None, use the current directory
142
+ The directory or cloud storage bucket to save the catalog to.
143
+ If None, use the current directory
142
144
catalog_type: str
143
145
The type of catalog to save. Whether to save the catalog table as a dictionary
144
146
in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'.
145
147
to_csv_kwargs : dict, optional
146
148
Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method.
147
149
json_dump_kwargs : dict, optional
148
150
Additional keyword arguments passed through to the :py:func:`~json.dump` function.
151
+ storage_options: dict
152
+ fsspec parameters passed to the backend file-system such as Google Cloud Storage,
153
+ Amazon Web Service S3.
149
154
150
155
Notes
151
156
-----
@@ -158,13 +163,12 @@ def save(
158
163
raise ValueError (
159
164
f'catalog_type must be either "dict" or "file". Received catalog_type={ catalog_type } '
160
165
)
161
- csv_file_name = pathlib .Path (f'{ name } .csv' )
162
- json_file_name = pathlib .Path (f'{ name } .json' )
163
- if directory :
164
- directory = pathlib .Path (directory )
165
- directory .mkdir (parents = True , exist_ok = True )
166
- csv_file_name = directory / csv_file_name
167
- json_file_name = directory / json_file_name
166
+ if isinstance (directory , pathlib .Path ):
167
+ directory = str (directory )
168
+ mapper = fsspec .get_mapper (directory or '.' , storage_options = storage_options )
169
+ fs = mapper .fs
170
+ csv_file_name = f'{ mapper .fs .protocol } ://{ mapper .root } /{ name } .csv'
171
+ json_file_name = f'{ mapper .fs .protocol } ://{ mapper .root } /{ name } .json'
168
172
169
173
data = self .dict ().copy ()
170
174
for key in {'catalog_dict' , 'catalog_file' }:
@@ -179,11 +183,13 @@ def save(
179
183
extensions = {'gzip' : '.gz' , 'bz2' : '.bz2' , 'zip' : '.zip' , 'xz' : '.xz' , None : '' }
180
184
csv_file_name = f'{ csv_file_name } { extensions [compression ]} '
181
185
data ['catalog_file' ] = str (csv_file_name )
182
- self .df .to_csv (csv_file_name , ** csv_kwargs )
186
+
187
+ with fs .open (csv_file_name , 'wb' ) as csv_outfile :
188
+ self .df .to_csv (csv_outfile , ** csv_kwargs )
183
189
else :
184
190
data ['catalog_dict' ] = self .df .to_dict (orient = 'records' )
185
191
186
- with open (json_file_name , 'w' ) as outfile :
192
+ with fs . open (json_file_name , 'w' ) as outfile :
187
193
json_kwargs = {'indent' : 2 }
188
194
json_kwargs .update (json_dump_kwargs or {})
189
195
json .dump (data , outfile , ** json_kwargs )
@@ -350,12 +356,13 @@ def search(
350
356
351
357
"""
352
358
353
- if not isinstance (query , QueryModel ):
354
- _query = QueryModel (
359
+ _query = (
360
+ query
361
+ if isinstance (query , QueryModel )
362
+ else QueryModel (
355
363
query = query , require_all_on = require_all_on , columns = self .df .columns .tolist ()
356
364
)
357
- else :
358
- _query = query
365
+ )
359
366
360
367
results = search (
361
368
df = self .df , query = _query .query , columns_with_iterables = self .columns_with_iterables
0 commit comments