Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 60 additions & 56 deletions pyW215/pyW215.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

try:
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
Expand All @@ -13,10 +12,22 @@

_LOGGER = logging.getLogger(__name__)


ON = 'ON'
OFF = 'OFF'

W215_A2_DEV_ID = {
'SWITCH': 1,
'THERMAL': 2,
'POWERMETER': 3
}

DEFAULT_DEV_ID = {
'SWITCH': 1,
'POWERMETER': 2,
'THERMAL': 3
}


class SmartPlug(object):
"""
Class to access:
Expand All @@ -38,8 +49,8 @@ class SmartPlug(object):
Class layout is inspired by @rkabadi (https://github.com/rkabadi) for the Edimax Smart plug.
"""

def __init__(self, ip, password, user = "admin",
use_legacy_protocol = False):
def __init__(self, ip, password, user="admin",
use_legacy_protocol=False):
"""
Create a new SmartPlug instance identified by the given URL and password.

Expand All @@ -57,6 +68,19 @@ def __init__(self, ip, password, user = "admin",
if self.use_legacy_protocol:
_LOGGER.info("Enabled support for legacy firmware.")
self._error_report = False
self.dev_id = DEFAULT_DEV_ID
if self.use_legacy_protocol:
try:
hw_ver = self.fetchMyCgi()['HW Ver']
# change dev map for HW A2
if hw_ver == 'A2':
self.dev_id = W215_A2_DEV_ID
# this assume the model number is W215, might be wrong
self.model_name = 'W215_' + hw_ver
except:
self.model_name = 'Unknown'
else:
self.model_name = self.SOAPAction("", "ModelName", "")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this to

self.model_name = self.SOAPAction(Action="", responseElement="ModelName", params = "")

As suggested in the PR from @clach04. It is much more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18 is now merged


def moduleParameters(self, module):
"""Returns moduleID XML.
Expand All @@ -76,7 +100,7 @@ def controlParameters(self, module, status):
:param status: The state to set (i.e. true (on) or false (off))
:return XML string to join with payload
"""
if self.use_legacy_protocol :
if self.use_legacy_protocol:
return '''{}<NickName>Socket 1</NickName><Description>Socket 1</Description>
<OPStatus>{}</OPStatus><Controller>1</Controller>'''.format(self.moduleParameters(module), status)
else:
Expand All @@ -91,7 +115,6 @@ def radioParameters(self, radio):
"""
return '''<RadioID>{}</RadioID>'''.format(radio)


def requestBody(self, Action, params):
"""Returns the request payload for an action as XML>.

Expand All @@ -111,7 +134,7 @@ def requestBody(self, Action, params):
</soap:Envelope>
'''.format(Action, params, Action)

def SOAPAction(self, Action, responseElement, params = ""):
def SOAPAction(self, Action, responseElement, params=""):
"""Generate the SOAP action call.

:type Action: str
Expand All @@ -130,15 +153,15 @@ def SOAPAction(self, Action, responseElement, params = ""):
payload = self.requestBody(Action, params)

# Timestamp in microseconds
time_stamp = str(round(time.time()/1e6))
time_stamp = str(round(time.time() / 1e6))

action_url = '"http://purenetworks.com/HNAP1/{}"'.format(Action)
AUTHKey = hmac.new(auth[0].encode(), (time_stamp+action_url).encode()).hexdigest().upper() + " " + time_stamp
AUTHKey = hmac.new(auth[0].encode(), (time_stamp + action_url).encode()).hexdigest().upper() + " " + time_stamp

headers = {'Content-Type' : '"text/xml; charset=utf-8"',
headers = {'Content-Type': '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/{}"'.format(Action),
'HNAP_AUTH' : '{}'.format(AUTHKey),
'Cookie' : 'uid={}'.format(auth[1])}
'HNAP_AUTH': '{}'.format(AUTHKey),
'Cookie': 'uid={}'.format(auth[1])}

try:
response = urlopen(Request(self.url, payload.encode(), headers))
Expand All @@ -152,7 +175,7 @@ def SOAPAction(self, Action, responseElement, params = ""):

# Get value from device
try:
value = root.find('.//{http://purenetworks.com/HNAP1/}%s' % (responseElement)).text
value = root.find('.//{http://purenetworks.com/HNAP1/}%s' % responseElement).text
except AttributeError:
_LOGGER.warning("Unable to find %s in response." % responseElement)
return None
Expand All @@ -168,7 +191,7 @@ def SOAPAction(self, Action, responseElement, params = ""):
def fetchMyCgi(self):
"""Fetches statistics from my_cgi.cgi"""
try:
response = urlopen(Request('http://{}/my_cgi.cgi'.format(self.ip), b'request=create_chklst'));
response = urlopen(Request('http://{}/my_cgi.cgi'.format(self.ip), b'request=create_chklst'))
except (HTTPError, URLError):
_LOGGER.warning("Failed to open url to {}".format(self.ip))
self._error_report = True
Expand All @@ -180,58 +203,37 @@ def fetchMyCgi(self):
@property
def current_consumption(self):
"""Get the current power consumption in Watt."""
res = 'N/A'
if self.use_legacy_protocol:
# Use /my_cgi.cgi to retrieve current consumption
try:
res = self.fetchMyCgi()['Meter Watt']
except:
return 'N/A'
else:
try:
res = self.SOAPAction('GetCurrentPowerConsumption', 'CurrentConsumption', self.moduleParameters("2"))
except:
return 'N/A'

if res is None:
try:
res = self.SOAPAction('GetCurrentPowerConsumption', 'CurrentConsumption',
self.moduleParameters(self.dev_id['POWERMETER']))
except:
return 'N/A'

try:
res = float(res)
except ValueError:
_LOGGER.error("Failed to retrieve current power consumption from SmartPlug")

return res

@property
def total_consumption(self):
"""Get the total power consumpuntion in the device lifetime."""
if self.use_legacy_protocol:
# TotalConsumption currently fails on the legacy protocol and
# creates a mess in the logs. Just return 'N/A' for now.
return 'N/A'

res = 'N/A'
try:
res = self.SOAPAction("GetPMWarningThreshold", "TotalConsumption", self.moduleParameters("2"))
res = self.SOAPAction("GetPMWarningThreshold", "TotalConsumption",
self.moduleParameters(self.dev_id['POWERMETER']))
except:
return 'N/A'

if res is None:
return 'N/A'

try:
float(res)
except ValueError:
_LOGGER.error("Failed to retrieve total power consumption from SmartPlug")

return res

@property
def temperature(self):
"""Get the device temperature in celsius."""
try:
res = self.SOAPAction('GetCurrentTemperature', 'CurrentTemperature', self.moduleParameters("3"))
res = self.SOAPAction('GetCurrentTemperature', 'CurrentTemperature',
self.moduleParameters(self.dev_id['THERMAL']))
except:
res = 'N/A'

Expand All @@ -240,7 +242,7 @@ def temperature(self):
@property
def state(self):
"""Get the device state (i.e. ON or OFF)."""
response = self.SOAPAction('GetSocketSettings', 'OPStatus', self.moduleParameters("1"))
response = self.SOAPAction('GetSocketSettings', 'OPStatus', self.moduleParameters(self.dev_id['SWITCH']))
if response is None:
return 'unknown'
elif response.lower() == 'true':
Expand All @@ -259,9 +261,11 @@ def state(self, value):
:param value: Future state (either ON or OFF)
"""
if value.upper() == ON:
return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', self.controlParameters("1", "true"))
return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult',
self.controlParameters(self.dev_id['SWITCH'], "true"))
elif value.upper() == OFF:
return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult', self.controlParameters("1", "false"))
return self.SOAPAction('SetSocketSettings', 'SetSocketSettingsResult',
self.controlParameters(self.dev_id['SWITCH'], "false"))
else:
raise TypeError("State %s is not valid." % str(value))

Expand All @@ -283,8 +287,8 @@ def auth(self):
payload = self.initial_auth_payload()

# Build initial header
headers = {'Content-Type' : '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/Login"'}
headers = {'Content-Type': '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/Login"'}

# Request privatekey, cookie and challenge
try:
Expand All @@ -302,21 +306,21 @@ def auth(self):
Cookie = root.find('.//{http://purenetworks.com/HNAP1/}Cookie').text
Publickey = root.find('.//{http://purenetworks.com/HNAP1/}PublicKey').text

if (Challenge == None or Cookie == None or Publickey == None) and self._error_report is False:
if (Challenge is None or Cookie is None or Publickey is None) and self._error_report is False:
_LOGGER.warning("Failed to receive initial authentication from smartplug.")
self._error_report = True
return None

# Generate hash responses
PrivateKey = hmac.new((Publickey+self.password).encode(), (Challenge).encode()).hexdigest().upper()
PrivateKey = hmac.new((Publickey + self.password).encode(), Challenge.encode()).hexdigest().upper()
login_pwd = hmac.new(PrivateKey.encode(), Challenge.encode()).hexdigest().upper()

response_payload = self.auth_payload(login_pwd)
# Build response to initial request
headers = {'Content-Type' : '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/Login"',
'HNAP_AUTH' : '"{}"'.format(PrivateKey),
'Cookie' : 'uid={}'.format(Cookie)}
headers = {'Content-Type': '"text/xml; charset=utf-8"',
'SOAPAction': '"http://purenetworks.com/HNAP1/Login"',
'HNAP_AUTH': '"{}"'.format(PrivateKey),
'Cookie': 'uid={}'.format(Cookie)}
response = urlopen(Request(self.url, response_payload, headers))
xmlData = response.read().decode()
root = ET.fromstring(xmlData)
Expand All @@ -329,8 +333,8 @@ def auth(self):
self._error_report = True
return None

self._error_report = False # Reset error logging
return (PrivateKey, Cookie)
self._error_report = False # Reset error logging
return PrivateKey, Cookie

def initial_auth_payload(self):
"""Return the initial authentication payload."""
Expand Down