Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 52 additions & 81 deletions midealocal/devices/ac/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Midea AC message."""

import logging
from collections.abc import Callable, Mapping
from enum import IntEnum
from types import MappingProxyType

from midealocal.const import MAX_BYTE_VALUE, DeviceType
from midealocal.crc8 import calculate
Expand All @@ -19,7 +21,7 @@
BB_AC_MODES = [0, 3, 1, 2, 4, 5]
BB_MIN_BODY_LENGTH = 21
CONFORT_MODE_MIN_LENGTH = 16
CONFORT_MODE_MIN_LENGTH = 23
CONFORT_MODE_MIN_LENGTH2 = 23
SMART_DRY_MIN_LENGTH = 20
SWING_LR_MIN_LENGTH = 21
FRESH_AIR_C0_MIN_LENGTH = 29
Expand All @@ -39,12 +41,14 @@
XC1_SUBBODY_TYPE_45 = 0x45


class PowerAnalysisMethod(IntEnum):
"""AC Power analysis method."""
class PowerFormats(IntEnum):
"""AC Power/Energy analysis formats."""

TYPE_1 = 1
TYPE_2 = 2
TYPE_3 = 3
# unless stated, consumption / energy is 0.01 kWh, and power in 0.1 W resolution
BCD = 1
BINARY = 2 # binary with energy in 0.1 kWh resolution
MIXED = 3 # mixed/INT (byte = 0-99)
BINARY1 = 12 # binary


class NewProtocolQuery(IntEnum):
Expand All @@ -68,7 +72,7 @@ class NewProtocolTags(IntEnum):
fresh_air_2 = 0x004B # queryType == "fresh_air"
prevent_super_cool = 0x0049
auto_prevent_straight_wind = 0x0226
self_clean = 0x0039 # self_clean query can't return response
self_clean = 0x0039 # self_clean query can't return response
wind_straight = 0x0032
wind_avoid = 0x0033
intelligent_wind = 0x0034
Expand Down Expand Up @@ -430,17 +434,14 @@ def _body(self) -> bytearray:
_subprotocol_body = self._subprotocol_body
_body = bytearray(
[
6
+ 2
+ (len(_subprotocol_body) if _subprotocol_body is not None else 0),
6 + 2 + len(_subprotocol_body),
0x00,
0xFF,
0xFF,
self._subprotocol_query_type,
],
)
if _subprotocol_body is not None:
_body.extend(_subprotocol_body)
_body.extend(_subprotocol_body)
return _body


Expand Down Expand Up @@ -580,7 +581,7 @@ def _body(self) -> bytearray:
# Byte2, mode target_temperature
mode = (self.mode << 5) & 0xE0
target_temperature = (int(self.target_temperature) & 0xF) | (
0x10 if int(round(self.target_temperature * 2)) % 2 != 0 else 0
0x10 if round(self.target_temperature * 2) % 2 != 0 else 0
)
# Byte 3, fan_speed
fan_speed = int(self.fan_speed) & 0x7F
Expand Down Expand Up @@ -979,7 +980,7 @@ def __init__(self, body: bytearray) -> None:
)
# comfortPowerSave
self.comfort_mode = (
(body[22] & 0x1) > 0 if len(body) >= CONFORT_MODE_MIN_LENGTH else False
(body[22] & 0x1) > 0 if len(body) >= CONFORT_MODE_MIN_LENGTH2 else False
)
# smartDryValue
self.smart_dry = (
Expand All @@ -1004,37 +1005,18 @@ def __init__(self, body: bytearray, analysis_method: int = 3) -> None:
"""Initialize AC C1 message body."""
super().__init__(body)
if body[3] == XC1_SUBBODY_TYPE_44:

def parse_consumption(data: bytearray) -> float:
return self.parse_consumption(analysis_method, data)

# total_power_consumption
self.total_energy_consumption = XC1MessageBody.parse_consumption(
analysis_method,
body[4],
body[5],
body[6],
body[7],
)
self.total_energy_consumption = parse_consumption(body[4:8])
# total_operating_consumption
self.total_operating_consumption = XC1MessageBody.parse_consumption(
analysis_method,
body[8],
body[9],
body[10],
body[11],
)
self.total_operating_consumption = parse_consumption(body[8:12])
# current_operating_consumption
self.current_energy_consumption = XC1MessageBody.parse_consumption(
analysis_method,
body[12],
body[13],
body[14],
body[15],
)
self.current_energy_consumption = parse_consumption(body[12:16])
# current_time_power
self.realtime_power = XC1MessageBody.parse_power(
analysis_method,
body[16],
body[17],
body[18],
)
self.realtime_power = self.parse_power(analysis_method, body[16:19])
elif body[3] == XC1_SUBBODY_TYPE_40:
self.electrify_time_day = body[5] | (body[4] << 8)
self.electrify_time_hour = body[6]
Expand Down Expand Up @@ -1073,49 +1055,38 @@ def __init__(self, body: bytearray, analysis_method: int = 3) -> None:
# indoor humidity, it should be the same value as XBB/XA1 message
self.indoor_humidity = body[4] if body[4] != 0 else None

@staticmethod
def parse_value(byte: int) -> int:
power_analysis_methods: Mapping[int, Callable[[int, int], int]] = MappingProxyType(
{
PowerFormats.BCD: lambda byte, value: (
(byte >> 4) * 10 + (byte & 0x0F) + value * 100
),
PowerFormats.BINARY: lambda byte, value: byte + (value << 8),
PowerFormats.MIXED: lambda byte, value: byte + value * 100,
},
)

@classmethod
def parse_value(cls, analysis_method: int, databytes: bytearray) -> float:
"""AC C1 message body parse value."""
return (byte >> 4) * 10 + (byte & 0x0F)

@staticmethod
def parse_power(analysis_method: int, byte1: int, byte2: int, byte3: int) -> float:
if analysis_method not in PowerFormats._value2member_map_:
return 0.0 # unknown method
analysis_function = cls.power_analysis_methods[analysis_method % 10]
value = 0
for byte in databytes:
value = analysis_function(byte, value)
return float(value)

@classmethod
def parse_power(cls, analysis_method: int, databytes: bytearray) -> float:
"""AC C1 message body parse power."""
if analysis_method == PowerAnalysisMethod.TYPE_1:
return (
float(
XC1MessageBody.parse_value(byte1) * 10000
+ XC1MessageBody.parse_value(byte2) * 100
+ XC1MessageBody.parse_value(byte3),
)
/ 10
)
if analysis_method == PowerAnalysisMethod.TYPE_2:
return float((byte1 << 16) + (byte2 << 8) + byte3) / 10
return float(byte1 * 10000 + byte2 * 100 + byte3) / 10

@staticmethod
def parse_consumption(
analysis_method: int,
byte1: int,
byte2: int,
byte3: int,
byte4: int,
) -> float:
return cls.parse_value(analysis_method, databytes) / 10

@classmethod
def parse_consumption(cls, analysis_method: int, databytes: bytearray) -> float:
"""AC C1 message body parse consumption."""
if analysis_method == PowerAnalysisMethod.TYPE_1:
return (
float(
XC1MessageBody.parse_value(byte1) * 1000000
+ XC1MessageBody.parse_value(byte2) * 10000
+ XC1MessageBody.parse_value(byte3) * 100
+ XC1MessageBody.parse_value(byte4),
)
/ 100
)
if analysis_method == PowerAnalysisMethod.TYPE_2:
return float((byte1 << 32) + (byte2 << 16) + (byte3 << 8) + byte4) / 10
return float(byte1 * 1000000 + byte2 * 10000 + byte3 * 100 + byte4) / 100
# LSB = 0.01 kWh, except for default binary format = 0.1 kWh
divisor = 10 if analysis_method == PowerFormats.BINARY else 100
return cls.parse_value(analysis_method, databytes) / divisor


class XBBMessageBody(MessageBody):
Expand Down
17 changes: 12 additions & 5 deletions tests/devices/ac/message_ac_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,20 +652,23 @@ def test_message_query_c1_method1(self) -> None:
assert hasattr(response, "realtime_power")
assert response.realtime_power == expected_realtime_power

def test_message_query_c1_method2(self) -> None:
"""Test Message parse query C1 method2."""
def test_message_query_c1_method2(self, method: int = 2) -> None:
"""Test Message parse query C1 method2 (and 12)."""
self.header[9] = 0x03
body = bytearray(20)
body[0] = 0xC1 # Body type
body[3] = 0x44 # Set the type to 0x44

# method 12 is like 2, but with 0.01kWh resolution instead of 0.1kWh
energy_divisor = 10 if method == 2 else 100

# Total energy consumption bytes
body[4] = 0x01
body[5] = 0x23
body[6] = 0x45
body[7] = 0x67
expected_total_energy = (
float((0x01 << 32) + (0x23 << 16) + (0x45 << 8) + 0x67) / 10
float((0x01 << 24) + (0x23 << 16) + (0x45 << 8) + 0x67) / energy_divisor
)

# Current energy consumption bytes
Expand All @@ -674,7 +677,7 @@ def test_message_query_c1_method2(self) -> None:
body[14] = 0xCD
body[15] = 0xEF
expected_current_energy = (
float((0x89 << 32) + (0xAB << 16) + (0xCD << 8) + 0xEF) / 10
float((0x89 << 24) + (0xAB << 16) + (0xCD << 8) + 0xEF) / energy_divisor
)

# Real-time power bytes
Expand All @@ -683,7 +686,7 @@ def test_message_query_c1_method2(self) -> None:
body[18] = 0x56
expected_realtime_power = float((0x12 << 16) + (0x34 << 8) + 0x56) / 10

response = MessageACResponse(self.header + body, 2)
response = MessageACResponse(self.header + body, method)

assert hasattr(response, "total_energy_consumption")
assert response.total_energy_consumption == expected_total_energy
Expand All @@ -692,6 +695,10 @@ def test_message_query_c1_method2(self) -> None:
assert hasattr(response, "realtime_power")
assert response.realtime_power == expected_realtime_power

def test_message_query_c1_method12(self) -> None:
"""Test Message parse query C1 method12."""
self.test_message_query_c1_method2(12)

def test_message_query_c1_method3(self) -> None:
"""Test Message parse query C1 method3."""
self.header[9] = 0x03
Expand Down