41
41
def valid_search_headers (headers : CaseInsensitiveDict ) -> bool :
42
42
"""Validate if this search is usable."""
43
43
# pylint: disable=invalid-name
44
- udn = headers .get ("_udn" ) # type: Optional[str]
45
- st = headers .get ("st" ) # type: Optional[str]
46
- location = headers .get ("location" , "" ) # type: str
44
+ udn = headers .get_lower ("_udn" ) # type: Optional[str]
45
+ st = headers .get_lower ("st" ) # type: Optional[str]
46
+ location = headers .get_lower ("location" , "" ) # type: str
47
47
return bool (
48
48
udn
49
49
and st
@@ -60,10 +60,10 @@ def valid_search_headers(headers: CaseInsensitiveDict) -> bool:
60
60
def valid_advertisement_headers (headers : CaseInsensitiveDict ) -> bool :
61
61
"""Validate if this advertisement is usable for connecting to a device."""
62
62
# pylint: disable=invalid-name
63
- udn = headers .get ("_udn" ) # type: Optional[str]
64
- nt = headers .get ("nt" ) # type: Optional[str]
65
- nts = headers .get ("nts" ) # type: Optional[str]
66
- location = headers .get ("location" , "" ) # type: str
63
+ udn = headers .get_lower ("_udn" ) # type: Optional[str]
64
+ nt = headers .get_lower ("nt" ) # type: Optional[str]
65
+ nts = headers .get_lower ("nts" ) # type: Optional[str]
66
+ location = headers .get_lower ("location" , "" ) # type: str
67
67
return bool (
68
68
udn
69
69
and nt
@@ -81,22 +81,22 @@ def valid_advertisement_headers(headers: CaseInsensitiveDict) -> bool:
81
81
def valid_byebye_headers (headers : CaseInsensitiveDict ) -> bool :
82
82
"""Validate if this advertisement has required headers for byebye."""
83
83
# pylint: disable=invalid-name
84
- udn = headers .get ("_udn" ) # type: Optional[str]
85
- nt = headers .get ("nt" ) # type: Optional[str]
86
- nts = headers .get ("nts" ) # type: Optional[str]
84
+ udn = headers .get_lower ("_udn" ) # type: Optional[str]
85
+ nt = headers .get_lower ("nt" ) # type: Optional[str]
86
+ nts = headers .get_lower ("nts" ) # type: Optional[str]
87
87
return bool (udn and nt and nts )
88
88
89
89
90
90
def extract_valid_to (headers : CaseInsensitiveDict ) -> datetime :
91
91
"""Extract/create valid to."""
92
- cache_control = headers .get ("cache-control" , "" )
92
+ cache_control = headers .get_lower ("cache-control" , "" )
93
93
match = CACHE_CONTROL_RE .search (cache_control )
94
94
if match :
95
95
max_age = int (match [1 ])
96
96
uncache_after = timedelta (seconds = max_age )
97
97
else :
98
98
uncache_after = DEFAULT_MAX_AGE
99
- timestamp : datetime = headers [ "_timestamp" ]
99
+ timestamp : datetime = headers . get_lower ( "_timestamp" )
100
100
return timestamp + uncache_after
101
101
102
102
@@ -247,7 +247,7 @@ def ip_version_from_location(location: str) -> Optional[int]:
247
247
248
248
def location_changed (ssdp_device : SsdpDevice , headers : CaseInsensitiveDict ) -> bool :
249
249
"""Test if location changed for device."""
250
- new_location = headers .get ("location" , "" )
250
+ new_location = headers .get_lower ("location" , "" )
251
251
if not new_location :
252
252
return False
253
253
@@ -295,14 +295,14 @@ def see_search(
295
295
_LOGGER .debug ("Received invalid search headers: %s" , headers )
296
296
return False , None , None , None
297
297
298
- udn = headers [ "_udn" ]
298
+ udn = headers . get_lower ( "_udn" )
299
299
is_new_device = udn not in self .devices
300
300
301
301
ssdp_device , new_location = self ._see_device (headers )
302
302
if not ssdp_device :
303
303
return False , None , None , None
304
304
305
- search_target : SearchTarget = headers [ "ST" ]
305
+ search_target : SearchTarget = headers . get_lower ( "st" )
306
306
is_new_service = (
307
307
search_target not in ssdp_device .advertisement_headers
308
308
and search_target not in ssdp_device .search_headers
@@ -339,14 +339,14 @@ def see_advertisement(
339
339
_LOGGER .debug ("Received invalid advertisement headers: %s" , headers )
340
340
return False , None , None
341
341
342
- udn = headers [ "_udn" ]
342
+ udn = headers . get_lower ( "_udn" )
343
343
is_new_device = udn not in self .devices
344
344
345
345
ssdp_device , new_location = self ._see_device (headers )
346
346
if not ssdp_device :
347
347
return False , None , None
348
348
349
- notification_type : NotificationType = headers [ "NT" ]
349
+ notification_type : NotificationType = headers . get_lower ( "nt" )
350
350
is_new_service = (
351
351
notification_type not in ssdp_device .advertisement_headers
352
352
and notification_type not in ssdp_device .search_headers
@@ -356,7 +356,7 @@ def see_advertisement(
356
356
"See new service: %s, type: %s" , ssdp_device , notification_type
357
357
)
358
358
359
- notification_sub_type : NotificationSubType = headers [ "NTS" ]
359
+ notification_sub_type : NotificationSubType = headers . get_lower ( "nts" )
360
360
propagate = (
361
361
notification_sub_type == NotificationSubType .SSDP_UPDATE
362
362
or is_new_device
@@ -407,8 +407,8 @@ def _see_device(
407
407
new_location = location_changed (ssdp_device , headers )
408
408
409
409
# Update device.
410
- ssdp_device .add_location (headers [ "location" ] , valid_to )
411
- ssdp_device .last_seen = headers [ "_timestamp" ]
410
+ ssdp_device .add_location (headers . get_lower ( "location" ) , valid_to )
411
+ ssdp_device .last_seen = headers . get_lower ( "_timestamp" )
412
412
if not self .next_valid_to or self .next_valid_to > ssdp_device .valid_to :
413
413
self .next_valid_to = ssdp_device .valid_to
414
414
@@ -433,7 +433,7 @@ def unsee_advertisement(
433
433
del self .devices [udn ]
434
434
435
435
# Update device before propagating it
436
- notification_type : NotificationType = headers [ "NT" ]
436
+ notification_type : NotificationType = headers . get_lower ( "nt" )
437
437
if notification_type in ssdp_device .advertisement_headers :
438
438
ssdp_device .advertisement_headers [notification_type ].replace (headers )
439
439
else :
0 commit comments