|
1 | 1 | """Modbus Request/Response Decoders.""" |
2 | 2 | from __future__ import annotations |
3 | 3 |
|
4 | | -import pymodbus.pdu.bit_message as bit_msg |
5 | | -import pymodbus.pdu.diag_message as diag_msg |
6 | | -import pymodbus.pdu.file_message as file_msg |
7 | | -import pymodbus.pdu.mei_message as mei_msg |
8 | | -import pymodbus.pdu.other_message as o_msg |
9 | | -import pymodbus.pdu.pdu as base |
10 | | -import pymodbus.pdu.register_message as reg_msg |
11 | 4 | from pymodbus.exceptions import MessageRegisterException, ModbusException |
12 | 5 | from pymodbus.logging import Log |
13 | 6 |
|
| 7 | +from .pdu import ExceptionResponse, ModbusPDU |
| 8 | + |
14 | 9 |
|
15 | 10 | class DecodePDU: |
16 | 11 | """Decode pdu requests/responses (server/client).""" |
17 | 12 |
|
18 | | - _pdu_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = { |
19 | | - (reg_msg.ReadHoldingRegistersRequest, reg_msg.ReadHoldingRegistersResponse), |
20 | | - (bit_msg.ReadDiscreteInputsRequest, bit_msg.ReadDiscreteInputsResponse), |
21 | | - (reg_msg.ReadInputRegistersRequest, reg_msg.ReadInputRegistersResponse), |
22 | | - (bit_msg.ReadCoilsRequest, bit_msg.ReadCoilsResponse), |
23 | | - (bit_msg.WriteMultipleCoilsRequest, bit_msg.WriteMultipleCoilsResponse), |
24 | | - (reg_msg.WriteMultipleRegistersRequest, reg_msg.WriteMultipleRegistersResponse), |
25 | | - (reg_msg.WriteSingleRegisterRequest, reg_msg.WriteSingleRegisterResponse), |
26 | | - (bit_msg.WriteSingleCoilRequest, bit_msg.WriteSingleCoilResponse), |
27 | | - (reg_msg.ReadWriteMultipleRegistersRequest, reg_msg.ReadWriteMultipleRegistersResponse), |
28 | | - (diag_msg.DiagnosticBase, diag_msg.DiagnosticBase), |
29 | | - (o_msg.ReadExceptionStatusRequest, o_msg.ReadExceptionStatusResponse), |
30 | | - (o_msg.GetCommEventCounterRequest, o_msg.GetCommEventCounterResponse), |
31 | | - (o_msg.GetCommEventLogRequest, o_msg.GetCommEventLogResponse), |
32 | | - (o_msg.ReportDeviceIdRequest, o_msg.ReportDeviceIdResponse), |
33 | | - (file_msg.ReadFileRecordRequest, file_msg.ReadFileRecordResponse), |
34 | | - (file_msg.WriteFileRecordRequest, file_msg.WriteFileRecordResponse), |
35 | | - (reg_msg.MaskWriteRegisterRequest, reg_msg.MaskWriteRegisterResponse), |
36 | | - (file_msg.ReadFifoQueueRequest, file_msg.ReadFifoQueueResponse), |
37 | | - (mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse), |
38 | | - } |
39 | | - |
40 | | - _pdu_sub_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = { |
41 | | - (diag_msg.ReturnQueryDataRequest, diag_msg.ReturnQueryDataResponse), |
42 | | - (diag_msg.RestartCommunicationsOptionRequest, diag_msg.RestartCommunicationsOptionResponse), |
43 | | - (diag_msg.ReturnDiagnosticRegisterRequest, diag_msg.ReturnDiagnosticRegisterResponse), |
44 | | - (diag_msg.ChangeAsciiInputDelimiterRequest, diag_msg.ChangeAsciiInputDelimiterResponse), |
45 | | - (diag_msg.ForceListenOnlyModeRequest, diag_msg.ForceListenOnlyModeResponse), |
46 | | - (diag_msg.ClearCountersRequest, diag_msg.ClearCountersResponse), |
47 | | - (diag_msg.ReturnBusMessageCountRequest, diag_msg.ReturnBusMessageCountResponse), |
48 | | - (diag_msg.ReturnBusCommunicationErrorCountRequest, diag_msg.ReturnBusCommunicationErrorCountResponse), |
49 | | - (diag_msg.ReturnBusExceptionErrorCountRequest, diag_msg.ReturnBusExceptionErrorCountResponse), |
50 | | - (diag_msg.ReturnDeviceMessageCountRequest, diag_msg.ReturnDeviceMessageCountResponse), |
51 | | - (diag_msg.ReturnDeviceNoResponseCountRequest, diag_msg.ReturnDeviceNoResponseCountResponse), |
52 | | - (diag_msg.ReturnDeviceNAKCountRequest, diag_msg.ReturnDeviceNAKCountResponse), |
53 | | - (diag_msg.ReturnDeviceBusyCountRequest, diag_msg.ReturnDeviceBusyCountResponse), |
54 | | - (diag_msg.ReturnDeviceBusCharacterOverrunCountRequest, diag_msg.ReturnDeviceBusCharacterOverrunCountResponse), |
55 | | - (diag_msg.ReturnIopOverrunCountRequest, diag_msg.ReturnIopOverrunCountResponse), |
56 | | - (diag_msg.ClearOverrunCountRequest, diag_msg.ClearOverrunCountResponse), |
57 | | - (diag_msg.GetClearModbusPlusRequest, diag_msg.GetClearModbusPlusResponse), |
58 | | - (mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse), |
59 | | - } |
| 13 | + pdu_class_table: dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]] = {} |
| 14 | + pdu_sub_class_table: dict[int, dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]]] = {} |
60 | 15 |
|
61 | 16 | def __init__(self, is_server: bool) -> None: |
62 | 17 | """Initialize function_tables.""" |
63 | | - inx = 0 if is_server else 1 |
64 | | - self.lookup: dict[int, type[base.ModbusPDU]] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table} |
65 | | - self.sub_lookup: dict[int, dict[int, type[base.ModbusPDU]]] = {} |
66 | | - for f in self._pdu_sub_class_table: |
67 | | - if (function_code := f[inx].function_code) not in self.sub_lookup: |
68 | | - self.sub_lookup[function_code] = {f[inx].sub_function_code: f[inx]} |
69 | | - else: |
70 | | - self.sub_lookup[function_code][f[inx].sub_function_code] = f[inx] |
| 18 | + self.inx = 0 if is_server else 1 |
| 19 | + |
| 20 | + @classmethod |
| 21 | + def add_pdu(cls, req: type[ModbusPDU], resp: type[ModbusPDU]): |
| 22 | + """Register request/response.""" |
| 23 | + cls.pdu_class_table[req.function_code] = (req, resp) |
71 | 24 |
|
72 | | - def lookupPduClass(self, data: bytes) -> type[base.ModbusPDU] | None: |
| 25 | + @classmethod |
| 26 | + def add_sub_pdu(cls, req: type[ModbusPDU], resp: type[ModbusPDU]): |
| 27 | + """Register request/response.""" |
| 28 | + if req.function_code not in cls.pdu_sub_class_table: |
| 29 | + cls.pdu_sub_class_table[req.function_code] = {} |
| 30 | + cls.pdu_sub_class_table[req.function_code][req.sub_function_code] = (req, resp) |
| 31 | + |
| 32 | + def lookupPduClass(self, data: bytes) -> type[ModbusPDU] | None: |
73 | 33 | """Use `function_code` to determine the class of the PDU.""" |
74 | 34 | func_code = int(data[1]) |
75 | 35 | if func_code & 0x80: |
76 | | - return base.ExceptionResponse |
| 36 | + return ExceptionResponse |
77 | 37 | if func_code == 0x2B: # mei message, sub_function_code is 1 byte |
78 | 38 | sub_func_code = int(data[2]) |
79 | | - return self.sub_lookup[func_code].get(sub_func_code, None) |
| 39 | + if not (type_class := self.pdu_sub_class_table[func_code].get(sub_func_code, None)): |
| 40 | + return None |
| 41 | + return type_class[self.inx] |
| 42 | + |
80 | 43 | if func_code == 0x08: # diag message, sub_function_code is 2 bytes |
81 | 44 | sub_func_code = int(data[3]) |
82 | | - return self.sub_lookup[func_code].get(sub_func_code, None) |
83 | | - return self.lookup.get(func_code, None) |
| 45 | + if not (type_class := self.pdu_sub_class_table[func_code].get(sub_func_code, None)): |
| 46 | + return None |
| 47 | + return type_class[self.inx] |
| 48 | + if not (type_class := self.pdu_class_table.get(func_code, None)): |
| 49 | + return None |
| 50 | + return type_class[self.inx] |
84 | 51 |
|
85 | | - def register(self, custom_class: type[base.ModbusPDU]) -> None: |
| 52 | + def register(self, custom_class: type[ModbusPDU]) -> None: |
86 | 53 | """Register a function and sub function class with the decoder.""" |
87 | | - if not issubclass(custom_class, base.ModbusPDU): |
| 54 | + if not issubclass(custom_class, ModbusPDU): |
88 | 55 | raise MessageRegisterException( |
89 | 56 | f'"{custom_class.__class__.__name__}" is Not a valid Modbus Message' |
90 | 57 | ". Class needs to be derived from " |
91 | 58 | "`pymodbus.pdu.ModbusPDU` " |
92 | 59 | ) |
93 | | - self.lookup[custom_class.function_code] = custom_class |
| 60 | + self.pdu_class_table[custom_class.function_code] = (custom_class, custom_class) |
94 | 61 | if custom_class.sub_function_code >= 0: |
95 | | - if custom_class.function_code not in self.sub_lookup: |
96 | | - self.sub_lookup[custom_class.function_code] = {} |
97 | | - self.sub_lookup[custom_class.function_code][ |
| 62 | + if custom_class.function_code not in self.pdu_sub_class_table: |
| 63 | + self.pdu_sub_class_table[custom_class.function_code] = {} |
| 64 | + self.pdu_sub_class_table[custom_class.function_code][ |
98 | 65 | custom_class.sub_function_code |
99 | | - ] = custom_class |
| 66 | + ] = (custom_class, custom_class) |
100 | 67 |
|
101 | | - def decode(self, frame: bytes) -> base.ModbusPDU | None: |
| 68 | + def decode(self, frame: bytes) -> ModbusPDU | None: |
102 | 69 | """Decode a frame.""" |
103 | 70 | try: |
104 | 71 | if (function_code := int(frame[0])) > 0x80: |
105 | | - pdu_exp = base.ExceptionResponse(function_code & 0x7F) |
| 72 | + pdu_exp = ExceptionResponse(function_code & 0x7F) |
106 | 73 | pdu_exp.decode(frame[1:]) |
107 | 74 | return pdu_exp |
108 | | - if not (pdu_class := self.lookup.get(function_code, None)): |
| 75 | + if not (pdu_class := self.pdu_class_table.get(function_code, None)): |
109 | 76 | Log.debug("decode PDU failed for function code {}", function_code) |
110 | 77 | raise ModbusException(f"Unknown response {function_code}") |
111 | | - pdu = pdu_class() |
| 78 | + pdu = pdu_class[self.inx]() |
112 | 79 | pdu.decode(frame[1:]) |
113 | 80 | if pdu.sub_function_code >= 0: |
114 | | - lookup = self.sub_lookup.get(pdu.function_code, {}) |
115 | | - if sub_class := lookup.get(pdu.sub_function_code, None): |
116 | | - pdu = sub_class() |
| 81 | + lookup = self.pdu_sub_class_table.get(pdu.function_code, {}) |
| 82 | + if type_class := lookup.get(pdu.sub_function_code, None): |
| 83 | + pdu = type_class[self.inx]() |
117 | 84 | pdu.decode(frame[1:]) |
118 | 85 | Log.debug("decoded PDU function_code({} sub {}) -> {} ", pdu.function_code, pdu.sub_function_code, str(pdu)) |
119 | 86 | return pdu |
|
0 commit comments