@@ -82,7 +82,7 @@ def iter_resp_lines(resp):
82
82
last_was_empty = False # Reset empty-line flag
83
83
else :
84
84
if not last_was_empty :
85
- yield '\n ' # Only print one empty line
85
+ yield '' # Only print one empty line
86
86
last_was_empty = True # Mark that we handled an empty line
87
87
next_newline = buffer .find (b'\n ' )
88
88
@@ -113,24 +113,29 @@ def get_watch_argument_name(self, func):
113
113
return 'watch'
114
114
115
115
def unmarshal_event (self , data , return_type ):
116
- js = json .loads (data )
117
- js ['raw_object' ] = js ['object' ]
118
- # BOOKMARK event is treated the same as ERROR for a quick fix of
119
- # decoding exception
120
- # TODO: make use of the resource_version in BOOKMARK event for more
121
- # efficient WATCH
122
- if return_type and js ['type' ] != 'ERROR' and js ['type' ] != 'BOOKMARK' :
123
- obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
124
- js ['object' ] = self ._api_client .deserialize (obj , return_type )
125
- if hasattr (js ['object' ], 'metadata' ):
126
- self .resource_version = js ['object' ].metadata .resource_version
127
- # For custom objects that we don't have model defined, json
128
- # deserialization results in dictionary
129
- elif (isinstance (js ['object' ], dict ) and 'metadata' in js ['object' ]
130
- and 'resourceVersion' in js ['object' ]['metadata' ]):
131
- self .resource_version = js ['object' ]['metadata' ][
132
- 'resourceVersion' ]
133
- return js
116
+ if not data or data .isspace ():
117
+ return None
118
+ try :
119
+ js = json .loads (data )
120
+ js ['raw_object' ] = js ['object' ]
121
+ # BOOKMARK event is treated the same as ERROR for a quick fix of
122
+ # decoding exception
123
+ # TODO: make use of the resource_version in BOOKMARK event for more
124
+ # efficient WATCH
125
+ if return_type and js ['type' ] != 'ERROR' and js ['type' ] != 'BOOKMARK' :
126
+ obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
127
+ js ['object' ] = self ._api_client .deserialize (obj , return_type )
128
+ if hasattr (js ['object' ], 'metadata' ):
129
+ self .resource_version = js ['object' ].metadata .resource_version
130
+ # For custom objects that we don't have model defined, json
131
+ # deserialization results in dictionary
132
+ elif (isinstance (js ['object' ], dict ) and 'metadata' in js ['object' ]
133
+ and 'resourceVersion' in js ['object' ]['metadata' ]):
134
+ self .resource_version = js ['object' ]['metadata' ][
135
+ 'resourceVersion' ]
136
+ return js
137
+ except json .JSONDecodeError :
138
+ return None
134
139
135
140
def stream (self , func , * args , ** kwargs ):
136
141
"""Watch an API resource and stream the result back via a generator.
@@ -209,7 +214,7 @@ def stream(self, func, *args, **kwargs):
209
214
yield line # Normal non-empty line
210
215
last_was_empty = False
211
216
elif not last_was_empty :
212
- yield '/n ' # Only yield one empty line
217
+ yield '' # Only yield one empty line
213
218
last_was_empty = True
214
219
if self ._stop :
215
220
break
0 commit comments