Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 56 additions & 51 deletions pyW215/pyW215.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SmartPlug(object):
"""

def __init__(self, ip, password, user="admin",
use_legacy_protocol=False):
use_legacy_protocol=False, auth_interval=10, retry_count=1):
"""
Create a new SmartPlug instance identified by the given URL and password.

Expand All @@ -49,16 +49,20 @@ def __init__(self, ip, password, user="admin",
:param password: Password to authenticate with the plug. Located on the plug.
:param user: Username for the plug. Default is admin.
:param use_legacy_protocol: Support legacy firmware versions. Default is False.
:param auth_interval: Number of seconds between re-authentications. Default is 10 seconds.
:param retry_count: Number of times to retry failed SOAP call before giving up. Default is 1.
"""
self.ip = ip
self.url = "http://{}/HNAP1/".format(ip)
self.user = user
self.password = password
self.use_legacy_protocol = use_legacy_protocol
self.authenticated = None
# Dict with authentication data {"key": PrivateKey, "cookie": Cookie, "authtime": time of authentication (epoch)}
self.auth = None
self.auth_interval = auth_interval
self.retry_count = retry_count
if self.use_legacy_protocol:
_LOGGER.info("Enabled support for legacy firmware.")
self._error_report = False
self.model_name = self.SOAPAction(Action="GetDeviceSettings", responseElement="ModelName", params="")

def moduleParameters(self, module):
Expand Down Expand Up @@ -113,56 +117,43 @@ def requestBody(self, Action, params):
</soap:Envelope>
'''.format(Action, params, Action)

def SOAPAction(self, Action, responseElement, params="", recursive=False):
"""Generate the SOAP action call.
def AuthAndSOAPAction(self, Action, responseElement, params=""):
"""Authenticate as needed and send the SOAP action call.

:type Action: str
:type responseElement: str
:type params: str
:type recursive: bool
:param Action: The action to perform on the device
:param responseElement: The XML element that is returned upon success
:param params: Any additional parameters required for performing request (i.e. RadioID, moduleID, ect)
:param recursive: True if first attempt failed and now attempting to re-authenticate prior
:return: Text enclosed in responseElement brackets
"""
# Authenticate client
if self.authenticated is None:
self.authenticated = self.auth()
auth = self.authenticated
# If not legacy protocol, ensure auth() is called for every call
if not self.use_legacy_protocol:
self.authenticated = None

if auth is None:
# Authenticate client if not authenticated or last authentication is too old
if (self.auth is None or (time.time() - self.auth["authtime"]) > self.auth_interval):
self.auth = self.authenticate()

if self.auth is None:
return None
payload = self.requestBody(Action, params)

# Timestamp in microseconds
time_stamp = str(round(time.time() / 1e6))

action_url = '"http://purenetworks.com/HNAP1/{}"'.format(Action)
AUTHKey = hmac.new(auth[0].encode(), (time_stamp + action_url).encode(), digestmod=hashlib.md5).hexdigest().upper() + " " + time_stamp
AUTHKey = hmac.new(self.auth["key"].encode(), (time_stamp + action_url).encode(), digestmod=hashlib.md5).hexdigest().upper() + " " + time_stamp

headers = {'Content-Type': '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/{}"'.format(Action),
'HNAP_AUTH': '{}'.format(AUTHKey),
'Cookie': 'uid={}'.format(auth[1])}
'Cookie': 'uid={}'.format(self.auth["cookie"])}

try:
response = urlopen(Request(self.url, payload.encode(), headers))
except (HTTPError, URLError):
# Try to re-authenticate once
self.authenticated = None
# Recursive call to retry action
if not recursive:
return_value = self.SOAPAction(Action, responseElement, params, True)
if recursive or return_value is None:
_LOGGER.warning("Failed to open url to {}".format(self.ip))
self._error_report = True
return None
else:
return return_value
_LOGGER.warning("Failed to open url to %s", self.ip)
# Invalidate authentication as well
self.auth = None
return None

xmlData = response.read().decode()
root = ET.fromstring(xmlData)
Expand All @@ -171,24 +162,43 @@ def SOAPAction(self, Action, responseElement, params="", recursive=False):
try:
value = root.find('.//{http://purenetworks.com/HNAP1/}%s' % (responseElement)).text
except AttributeError:
_LOGGER.warning("Unable to find %s in response." % responseElement)
_LOGGER.warning("Unable to find %s in response.", responseElement)
_LOGGER.debug("Response: %s", repr(xmlData))
return None

if value is None and self._error_report is False:
_LOGGER.warning("Could not find %s in response." % responseElement)
self._error_report = True
if value is None:
_LOGGER.warning("Could not find %s in response.", responseElement)
_LOGGER.debug("Response: %s", repr(xmlData))
return None

self._error_report = False
return value

def SOAPAction(self, Action, responseElement, params=""):
"""Generate the SOAP action call. Retry on error as configured.

:type Action: str
:type responseElement: str
:type params: str
:param Action: The action to perform on the device
:param responseElement: The XML element that is returned upon success
:param params: Any additional parameters required for performing request (i.e. RadioID, moduleID, ect)
:return: Text enclosed in responseElement brackets
"""
response = None
tries = 0
while(response is None and tries <= self.retry_count):
tries += 1
_LOGGER.debug("SOAPAction #%s %s", tries, Action)
response = self.AuthAndSOAPAction(Action, responseElement, params)

return response

def fetchMyCgi(self):
"""Fetches statistics from my_cgi.cgi"""
try:
response = urlopen(Request('http://{}/my_cgi.cgi'.format(self.ip), b'request=create_chklst'));
except (HTTPError, URLError):
_LOGGER.warning("Failed to open url to {}".format(self.ip))
self._error_report = True
_LOGGER.warning("Failed to open url to %s", self.ip)
return None

lines = response.readlines()
Expand Down Expand Up @@ -277,7 +287,7 @@ def state(self):
elif response.lower() == 'false':
return OFF
else:
_LOGGER.warning("Unknown state %s returned" % str(response.lower()))
_LOGGER.warning("Unknown state %s returned", response)
return 'unknown'

@state.setter
Expand All @@ -298,7 +308,7 @@ def get_state(self):
"""Get the device state (i.e. ON or OFF)."""
return self.state

def auth(self):
def authenticate(self):
"""Authenticate using the SOAP interface.

Authentication is a two-step process. First a initial payload
Expand All @@ -312,6 +322,7 @@ def auth(self):

See https://github.com/bikerp/dsp-w215-hnap/wiki/Authentication-process for more information.
"""
_LOGGER.debug("Authenticating to %s as %s", self.url, self.user)

payload = self.initial_auth_payload()

Expand All @@ -323,9 +334,7 @@ def auth(self):
try:
response = urlopen(Request(self.url, payload, headers))
except URLError:
if self._error_report is False:
_LOGGER.warning('Unable to open a connection to dlink switch {}'.format(self.ip))
self._error_report = True
_LOGGER.warning('Unable to open a connection to dlink switch %s', self.ip)
return None
xmlData = response.read().decode()
root = ET.fromstring(xmlData)
Expand All @@ -336,12 +345,9 @@ def auth(self):
PublickeyResponse = root.find('.//{http://purenetworks.com/HNAP1/}PublicKey')

if (
ChallengeResponse == None or CookieResponse == None or PublickeyResponse == None) and self._error_report is False:
ChallengeResponse == None or CookieResponse == None or PublickeyResponse == None):
_LOGGER.warning("Failed to receive initial authentication from smartplug.")
self._error_report = True
return None

if self._error_report is True:
_LOGGER.debug("Response: %s", repr(xmlData))
return None

Challenge = ChallengeResponse.text
Expand All @@ -365,13 +371,12 @@ def auth(self):
# Find responses
login_status = root.find('.//{http://purenetworks.com/HNAP1/}LoginResult').text.lower()

if login_status != "success" and self._error_report is False:
_LOGGER.error("Failed to authenticate with SmartPlug {}".format(self.ip))
self._error_report = True
if login_status != "success":
_LOGGER.error("Failed to authenticate with SmartPlug %s", self.ip)
_LOGGER.debug("Response: %s", repr(xmlData))
return None

self._error_report = False # Reset error logging
return (PrivateKey, Cookie)
return {"key": PrivateKey, "cookie": Cookie, "authtime": time.time()}

def initial_auth_payload(self):
"""Return the initial authentication payload."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
long_description = f.read()

setup(name='pyW215',
version='0.7.0',
version='0.7.1',
description='Interface for d-link W215 Smart Plugs.',
long_description=long_description,
url='https://github.com/linuxchristian/pyW215',
Expand Down