44
55import datetime
66import json
7+ from urllib .parse import urlparse
8+ from enum import Enum
9+
710import streamsx .spl .op
811import streamsx .spl .types
9-
1012from streamsx .topology .schema import CommonSchema , StreamSchema
11- from urllib .parse import urlparse
1213from streamsx .toolkits import download_toolkit
13- from enum import Enum
1414
1515
1616_TOOLKIT_NAME = 'com.ibm.streamsx.hdfs'
3333``'tuple<rstring fileName>'``
3434"""
3535
36-
37-
38- def _add_toolkit_dependency (topo , version ):
36+ def _add_toolkit_dependency (topo ):
3937 # IMPORTANT: Dependency of this python wrapper to a specific toolkit version
4038 # This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service)
41- streamsx .spl .toolkit .add_toolkit_dependency (topo , _TOOLKIT_NAME , version )
42-
39+ streamsx .spl .toolkit .add_toolkit_dependency (topo , _TOOLKIT_NAME , '[5.0.0,6.0.0)' )
4340
44- def _read_ae_service_credentials (credentials ):
41+ def _read_service_credentials (credentials ):
4542 hdfs_uri = ""
4643 user = ""
4744 password = ""
@@ -64,6 +61,23 @@ def _read_ae_service_credentials(credentials):
6461 uri_parsed = urlparse (hdfs_uri )
6562 hdfs_uri = 'webhdfs://' + uri_parsed .netloc
6663 return hdfs_uri , user , password
64+
65+ def _check_vresion_credentials (credentials , _op , topology ):
66+ # check streamsx.hdfs version
67+ _add_toolkit_dependency (topology )
68+
69+ if isinstance (credentials , dict ):
70+ hdfs_uri , user , password = _read_service_credentials (credentials )
71+ _op .params ['hdfsUri' ] = hdfs_uri
72+ _op .params ['hdfsUser' ] = user
73+ _op .params ['hdfsPassword' ] = password
74+ #check if the credentials is a valid JSON string
75+ elif _is_a_valid_json (credentials ):
76+ _op .params ['credentials' ] = credentials
77+ else :
78+ # expect core-site.xml file in credentials param
79+ topology .add_file_dependency (credentials , 'etc' )
80+ _op .params ['configPath' ] = 'etc'
6781
6882def _check_time_param (time_value , parameter_name ):
6983 if isinstance (time_value , datetime .timedelta ):
@@ -76,6 +90,15 @@ def _check_time_param(time_value, parameter_name):
7690 raise ValueError ("Invalid " + parameter_name + " value. Value must be at least one second." )
7791 return result
7892
93+ def _is_a_valid_json (credentials ):
94+ # checking if the input string is a valid JSON string
95+ try :
96+ json .loads (credentials )
97+ return 1
98+ except :
99+ pass
100+ return 0
101+
79102class CopyDirection (Enum ):
80103 """Defines File Copy directions for HDFS2FileCopy.
81104
@@ -184,6 +207,7 @@ def download_toolkit(url=None, target_dir=None):
184207 _toolkit_location = streamsx .toolkits .download_toolkit (toolkit_name = _TOOLKIT_NAME , url = url , target_dir = target_dir )
185208 return _toolkit_location
186209
210+
187211
188212def scan (topology , credentials , directory , pattern = None , init_delay = None , name = None ):
189213 """Scans a Hadoop Distributed File System directory for new or modified files.
@@ -205,19 +229,7 @@ def scan(topology, credentials, directory, pattern=None, init_delay=None, name=N
205229
206230 _op = _HDFS2DirectoryScan (topology , directory = directory , pattern = pattern , schema = DirectoryScanSchema , name = name )
207231
208- if isinstance (credentials , dict ):
209- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
210- _op .params ['hdfsUri' ] = hdfs_uri
211- _op .params ['hdfsUser' ] = user
212- _op .params ['hdfsPassword' ] = password
213- else :
214- # JSON string
215- if credentials .startswith ('{' ):
216- _op .params ['credentials' ] = credentials
217- else :
218- # expect core-site.xml file in credentials param
219- topology .add_file_dependency (credentials , 'etc' )
220- _op .params ['configPath' ] = 'etc'
232+ _check_vresion_credentials (credentials , _op , topology )
221233
222234 if init_delay is not None :
223235 _op .params ['initDelay' ] = streamsx .spl .types .float64 (_check_time_param (init_delay , 'init_delay' ))
@@ -241,20 +253,8 @@ def read(stream, credentials, schema=CommonSchema.String, name=None):
241253 """
242254
243255 _op = _HDFS2FileSource (stream , schema = schema , name = name )
244-
245- if isinstance (credentials , dict ):
246- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
247- _op .params ['hdfsUri' ] = hdfs_uri
248- _op .params ['hdfsUser' ] = user
249- _op .params ['hdfsPassword' ] = password
250- else :
251- # JSON string
252- if credentials .startswith ('{' ):
253- _op .params ['credentials' ] = credentials
254- else :
255- # expect core-site.xml file in credentials param
256- stream .topology .add_file_dependency (credentials , 'etc' )
257- _op .params ['configPath' ] = 'etc'
256+
257+ _check_vresion_credentials (credentials , _op , stream .topology )
258258
259259 return _op .outputs [0 ]
260260
@@ -291,25 +291,14 @@ def write(stream, credentials, file=None, fileAttributeName=None, schema=None, t
291291 Returns:
292292 Output Stream with schema :py:const:`~streamsx.hdfs.FileInfoSchema`.
293293 """
294-
295294 # check bytes_per_file, time_per_file and tuples_per_file parameters
296295 if (time_per_file is not None and tuples_per_file is not None ) or (tuples_per_file is not None and bytes_per_file is not None ) or (time_per_file is not None and bytes_per_file is not None ):
297296 raise ValueError ("The parameters are mutually exclusive: bytes_per_file, time_per_file, tuples_per_file" )
298297
299298 _op = _HDFS2FileSink (stream , file = file , fileAttributeName = fileAttributeName , schema = FileInfoSchema , name = name )
299+
300+ _check_vresion_credentials (credentials , _op , stream .topology )
300301
301- if isinstance (credentials , dict ):
302- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
303- _op .params ['hdfsUri' ] = hdfs_uri
304- _op .params ['hdfsUser' ] = user
305- _op .params ['hdfsPassword' ] = password
306- else :
307- if credentials .startswith ('{' ):
308- _op .params ['credentials' ] = credentials
309- else :
310- # expect core-site.xml file in credentials param
311- stream .topology .add_file_dependency (credentials , 'etc' )
312- _op .params ['configPath' ] = 'etc'
313302
314303 if time_per_file is None and tuples_per_file is None and bytes_per_file is None :
315304 _op .params ['closeOnPunct' ] = _op .expression ('true' )
@@ -343,20 +332,9 @@ def copy(stream, credentials, direction, hdfsFile=None, hdfsFileAttrName=None, l
343332 Direction = _convert_copy_direction_string_to_enum (direction )
344333
345334 _op = _HDFS2FileCopy (stream , direction = Direction , hdfsFileAttrName = hdfsFileAttrName , localFile = localFile , schema = FileCopySchema , name = name )
346-
347- if isinstance (credentials , dict ):
348- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
349- _op .params ['hdfsUri' ] = hdfs_uri
350- _op .params ['hdfsUser' ] = user
351- _op .params ['hdfsPassword' ] = password
352- else :
353- if credentials .startswith ('{' ):
354- _op .params ['credentials' ] = credentials
355- else :
356- # expect core-site.xml file in credentials param
357- stream .topology .add_file_dependency (credentials , 'etc' )
358- _op .params ['configPath' ] = 'etc'
359335
336+ _check_vresion_credentials (credentials , _op , stream .topology )
337+
360338 return _op .outputs [0 ]
361339
362340
0 commit comments