diff --git a/custom_components/ocpp/api.py b/custom_components/ocpp/api.py index d97e1f49..dc472c13 100644 --- a/custom_components/ocpp/api.py +++ b/custom_components/ocpp/api.py @@ -290,10 +290,19 @@ async def on_connect(self, websocket: ServerConnection): await charge_point.reconnect(websocket) def _get_metrics(self, id: str): - """Return metrics.""" + """Return (cp_id, metrics mapping, cp instance, safe int num_connectors).""" cp_id = self.cpids.get(id, id) cp = self.charge_points.get(cp_id) - n_connectors = getattr(cp, "num_connectors", 1) or 1 + + def _safe_int(value, default=1): + try: + iv = int(value) + return iv if iv > 0 else default + except Exception: + return default + + n_connectors = _safe_int(getattr(cp, "num_connectors", 1), default=1) + return ( (cp_id, cp._metrics, cp, n_connectors) if cp is not None diff --git a/custom_components/ocpp/chargepoint.py b/custom_components/ocpp/chargepoint.py index 9d427795..36d717fa 100644 --- a/custom_components/ocpp/chargepoint.py +++ b/custom_components/ocpp/chargepoint.py @@ -363,8 +363,18 @@ async def post_connect(self): if prof.REM in self._attr_supported_features: if self.received_boot_notification is False: - await self.trigger_boot_notification() - await self.trigger_status_notification() + try: + await asyncio.wait_for( + self.trigger_boot_notification(), timeout=3 + ) + except Exception as ex: + _LOGGER.debug("trigger_boot_notification ignored: %s", ex) + try: + await asyncio.wait_for( + self.trigger_status_notification(), timeout=3 + ) + except Exception as ex: + _LOGGER.debug("trigger_status_notification ignored: %s", ex) except Exception as e: _LOGGER.debug("post_connect aborted non-fatally: %s", e) @@ -464,41 +474,48 @@ async def monitor_connection(self): self._metrics[(0, cstat.latency_pong.value)].unit = "ms" connection = self._connection timeout_counter = 0 + # Add backstop to start post connect for non-compliant chargers # after 10s to allow for when a boot notification has not been received await asyncio.sleep(10) if not self.post_connect_success: self.hass.async_create_task(self.post_connect()) + while connection.state is State.OPEN: try: await asyncio.sleep(self.cs_settings.websocket_ping_interval) time0 = time.perf_counter() latency_ping = self.cs_settings.websocket_ping_timeout * 1000 + latency_pong = self.cs_settings.websocket_ping_timeout * 1000 pong_waiter = await asyncio.wait_for( connection.ping(), timeout=self.cs_settings.websocket_ping_timeout ) time1 = time.perf_counter() latency_ping = round(time1 - time0, 3) * 1000 - latency_pong = self.cs_settings.websocket_ping_timeout * 1000 + await asyncio.wait_for( pong_waiter, timeout=self.cs_settings.websocket_ping_timeout ) timeout_counter = 0 time2 = time.perf_counter() latency_pong = round(time2 - time1, 3) * 1000 + _LOGGER.debug( - f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': ping={latency_ping} ms, pong={latency_pong} ms", + f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': " + f"ping={latency_ping} ms, pong={latency_pong} ms", ) self._metrics[(0, cstat.latency_ping.value)].value = latency_ping self._metrics[(0, cstat.latency_pong.value)].value = latency_pong except TimeoutError as timeout_exception: + timeout_counter += 1 _LOGGER.debug( - f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': ping={latency_ping} ms, pong={latency_pong} ms", + f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': " + f"ping={latency_ping} ms, pong={latency_pong} ms", ) self._metrics[(0, cstat.latency_ping.value)].value = latency_ping self._metrics[(0, cstat.latency_pong.value)].value = latency_pong - timeout_counter += 1 + if timeout_counter > self.cs_settings.websocket_ping_tries: _LOGGER.debug( f"Connection to '{self.id}' timed out after '{self.cs_settings.websocket_ping_tries}' ping tries", @@ -506,6 +523,9 @@ async def monitor_connection(self): raise timeout_exception else: continue + except Exception as ex: + _LOGGER.debug(f"monitor_connection stopping due to exception: {ex}") + break async def _handle_call(self, msg): try: @@ -646,8 +666,15 @@ def get_authorization_status(self, id_tag): ) return auth_status - def process_phases(self, data: list[MeasurandValue], connector_id: int | None = 0): - """Process phase data from meter values.""" + def process_phases(self, data: list[MeasurandValue], connector_id: int = 0): + """Process per-phase MeterValues and aggregate them into per-connector metrics. + + Rules: + - Voltage: average (L1-N/L2-N/L3-N or L-L divided by √3); fall back to averaging L1/L2/L3 if needed. + - Current.*: average of L1/L2/L3 (ignore N). + - Power.Factor: **average** of L1/L2/L3 (ignore N). *Do not sum; unit is dimensionless and may be missing.* + - Other (e.g. Power.Active.*): sum of L1/L2/L3 (ignore N). + """ # For single-connector chargers, use connector 1. n_connectors = getattr(self, CONF_NUM_CONNECTORS, DEFAULT_NUM_CONNECTORS) or 1 if connector_id in (None, 0): @@ -658,13 +685,13 @@ def process_phases(self, data: list[MeasurandValue], connector_id: int | None = except Exception: target_cid = 1 if n_connectors == 1 else 0 - def average_of_nonzero(values): - nonzero_values: list = [v for v in values if v != 0.0] - nof_values: int = len(nonzero_values) - average = sum(nonzero_values) / nof_values if nof_values > 0 else 0 - return average + def average_of_nonzero(values: list[float]) -> float: + """Average only non-zero values; return 0.0 if all are zero or list is empty.""" + nonzero = [v for v in values if v != 0.0] + return (sum(nonzero) / len(nonzero)) if nonzero else 0.0 + + measurand_data: dict[str, dict[str, float]] = {} - measurand_data = {} for item in data: # create ordered Dict for each measurand, eg {"voltage":{"unit":"V","L1-N":"230"...}} measurand = item.measurand @@ -672,64 +699,115 @@ def average_of_nonzero(values): value = item.value unit = item.unit context = item.context - if measurand is not None and phase is not None and unit is not None: - if measurand not in measurand_data: - measurand_data[measurand] = {} + + if measurand is None or phase is None: + continue + + if measurand not in measurand_data: + measurand_data[measurand] = {} + + if unit is not None: measurand_data[measurand][om.unit.value] = unit - measurand_data[measurand][phase] = value self._metrics[(target_cid, measurand)].unit = unit self._metrics[(target_cid, measurand)].extra_attr[om.unit.value] = unit - self._metrics[(target_cid, measurand)].extra_attr[phase] = value + + measurand_data[measurand][phase] = value + self._metrics[(target_cid, measurand)].extra_attr[phase] = value + if context is not None: self._metrics[(target_cid, measurand)].extra_attr[om.context.value] = ( context ) - line_phases = [Phase.l1.value, Phase.l2.value, Phase.l3.value, Phase.n.value] + line_phases_all = [ + Phase.l1.value, + Phase.l2.value, + Phase.l3.value, + Phase.n.value, + ] + phases_l123 = [Phase.l1.value, Phase.l2.value, Phase.l3.value] line_to_neutral_phases = [Phase.l1_n.value, Phase.l2_n.value, Phase.l3_n.value] line_to_line_phases = [Phase.l1_l2.value, Phase.l2_l3.value, Phase.l3_l1.value] + def _avg_l123(phase_info: dict) -> float: + return average_of_nonzero( + [phase_info.get(phase, 0.0) for phase in phases_l123] + ) + + def _sum_l123(phase_info: dict) -> float: + return sum(phase_info.get(phase, 0.0) for phase in phases_l123) + for metric, phase_info in measurand_data.items(): - metric_value = None + metric_value: float | None = None + mname = str(metric) + if metric in [Measurand.voltage.value]: if not phase_info.keys().isdisjoint(line_to_neutral_phases): # Line to neutral voltages are averaged metric_value = average_of_nonzero( - [phase_info.get(phase, 0) for phase in line_to_neutral_phases] + [phase_info.get(phase, 0.0) for phase in line_to_neutral_phases] ) elif not phase_info.keys().isdisjoint(line_to_line_phases): # Line to line voltages are averaged and converted to line to neutral metric_value = average_of_nonzero( - [phase_info.get(phase, 0) for phase in line_to_line_phases] + [phase_info.get(phase, 0.0) for phase in line_to_line_phases] ) / sqrt(3) - elif not phase_info.keys().isdisjoint(line_phases): + elif not phase_info.keys().isdisjoint(line_phases_all): # Workaround for chargers that don't follow engineering convention # Assumes voltages are line to neutral - metric_value = average_of_nonzero( - [phase_info.get(phase, 0) for phase in line_phases] - ) + metric_value = _avg_l123(phase_info) + else: - if not phase_info.keys().isdisjoint(line_phases): - metric_value = sum( - phase_info.get(phase, 0) for phase in line_phases - ) - elif not phase_info.keys().isdisjoint(line_to_neutral_phases): - # Workaround for some chargers that erroneously use line to neutral for current - metric_value = sum( - phase_info.get(phase, 0) for phase in line_to_neutral_phases - ) + is_current = mname.lower().startswith("current") + if is_current: + # Current.* shown per phase -> avg of L1/L2/L3, ignore N + if not phase_info.keys().isdisjoint(phases_l123): + metric_value = _avg_l123(phase_info) + elif not phase_info.keys().isdisjoint(line_to_neutral_phases): + # Workaround for some chargers that erroneously use line to neutral for current + metric_value = average_of_nonzero( + [ + phase_info.get(phase, 0.0) + for phase in line_to_neutral_phases + ] + ) + + # Special-case: Power.Factor must be averaged, never summed + elif metric == Measurand.power_factor.value: + if not phase_info.keys().isdisjoint(phases_l123): + metric_value = _avg_l123(phase_info) + elif not phase_info.keys().isdisjoint(line_to_neutral_phases): + metric_value = average_of_nonzero( + [phase_info.get(p, 0.0) for p in line_to_neutral_phases] + ) + # If only a single phase value exists, just pass it through + else: + metric_value = next( + (v for k, v in phase_info.items() if k != om.unit.value), + None, + ) + + else: + # Other (e.g. Power.*): total is sum over phases + if not phase_info.keys().isdisjoint(phases_l123): + metric_value = _sum_l123(phase_info) + elif not phase_info.keys().isdisjoint(line_to_neutral_phases): + metric_value = sum( + phase_info.get(phase, 0.0) + for phase in line_to_neutral_phases + ) if metric_value is not None: metric_unit = phase_info.get(om.unit.value) - m = self._metrics[(target_cid, metric)] + if metric_unit == DEFAULT_POWER_UNIT: - m.value = metric_value / 1000 - m.unit = HA_POWER_UNIT + self._metrics[(target_cid, metric)].value = metric_value / 1000 + self._metrics[(target_cid, metric)].unit = HA_POWER_UNIT elif metric_unit == DEFAULT_ENERGY_UNIT: - m.value = metric_value / 1000 - m.unit = HA_ENERGY_UNIT + self._metrics[(target_cid, metric)].value = metric_value / 1000 + self._metrics[(target_cid, metric)].unit = HA_ENERGY_UNIT else: - m.value = metric_value - m.unit = metric_unit + self._metrics[(target_cid, metric)].value = metric_value + self._metrics[(target_cid, metric)].unit = metric_unit @staticmethod def get_energy_kwh(measurand_value: MeasurandValue) -> float: @@ -803,6 +881,36 @@ def process_measurands( and is_transaction and self._ocpp_version != "1.6" ): + # Ensure session metric is present and well-formed + sess_key = (connector_id, "Energy.Session") + if sess_key not in self._metrics: + self._metrics[sess_key] = Metric(0.0, HA_ENERGY_UNIT) + else: + if self._metrics[sess_key].unit is None: + self._metrics[sess_key].unit = HA_ENERGY_UNIT + if self._metrics[sess_key].value is None: + self._metrics[sess_key].value = 0.0 + + # Bootstrap baseline for 2.x if missing: + ms_key = ( + connector_id, + csess.meter_start.value, + ) # "Energy.Meter.Start" + if ms_key not in self._metrics: + # Create the slot with kWh unit to match normalized EAIR above + self._metrics[ms_key] = Metric(None, HA_ENERGY_UNIT) + + ms_metric = self._metrics[ms_key] + if ms_metric.value is None: + # First EAIR in this transaction: set baseline to current EAIR (kWh) + ms_metric.value = value + # Keep session at 0.0 for the baseline sample + else: + # Compute positive delta only (guard against counter resets) + delta = value - ms_metric.value + if delta >= 0: + self._metrics[sess_key].value = round(delta, 6) + if ( self._charger_reports_session_energy and context != ReadingContext.transaction_begin.value diff --git a/custom_components/ocpp/ocppv16.py b/custom_components/ocpp/ocppv16.py index 402984ab..0dce7193 100644 --- a/custom_components/ocpp/ocppv16.py +++ b/custom_components/ocpp/ocppv16.py @@ -152,53 +152,98 @@ async def get_heartbeat_interval(self): async def get_supported_measurands(self) -> str: """Get comma-separated list of measurands supported by the charger.""" - all_measurands = self.settings.monitored_variables - autodetect_measurands = self.settings.monitored_variables_autoconfig - + all_measurands = self.settings.monitored_variables or "" + autodetect_measurands = bool(self.settings.monitored_variables_autoconfig) key = ckey.meter_values_sampled_data.value + desired_csv = all_measurands.strip().strip(",") + cfg_ok = {ConfigurationStatus.accepted, ConfigurationStatus.reboot_required} + + effective_csv: str = "" + if autodetect_measurands: - accepted_measurands = [] - cfg_ok = [ - ConfigurationStatus.accepted, - ConfigurationStatus.reboot_required, - ] + # One-shot CSV attempt + if desired_csv: + _LOGGER.debug( + "'%s' attempting CSV set for measurands: %s", self.id, desired_csv + ) + try: + resp = await self.call( + call.ChangeConfiguration(key=key, value=desired_csv) + ) + if getattr(resp, "status", None) in cfg_ok: + _LOGGER.debug( + "'%s' measurands CSV accepted with status=%s", + self.id, + resp.status, + ) + effective_csv = desired_csv + else: + _LOGGER.debug( + "'%s' measurands CSV rejected with status=%s; falling back to GetConfiguration", + self.id, + getattr(resp, "status", None), + ) + except Exception as ex: + _LOGGER.debug( + "get_supported_measurands CSV set raised for '%s': %s", + self.id, + ex, + ) - for measurand in all_measurands.split(","): - _LOGGER.debug(f"'{self.id}' trying measurand: '{measurand}'") - req = call.ChangeConfiguration(key=key, value=measurand) - resp = await self.call(req) - if resp.status in cfg_ok: - _LOGGER.debug(f"'{self.id}' adding measurand: '{measurand}'") - accepted_measurands.append(measurand) + # Always read back what the charger actually has + chgr_csv = await self.get_configuration(key) - accepted_measurands = ",".join(accepted_measurands) - else: - accepted_measurands = all_measurands - - # Quirk: - # Workaround for a bug on chargers that have invalid MeterValuesSampledData - # configuration and reboot while the server requests MeterValuesSampledData. - # By setting the configuration directly without checking current configuration - # as done when calling self.configure, the server avoids charger reboot. - # Corresponding issue: https://github.com/lbbrhzn/ocpp/issues/1275 - if len(accepted_measurands) > 0: - req = call.ChangeConfiguration(key=key, value=accepted_measurands) - resp = await self.call(req) + if not effective_csv: _LOGGER.debug( - f"'{self.id}' measurands set manually to {accepted_measurands}" + "'%s' measurands not configurable by integration", self.id ) + _LOGGER.debug("'%s' allowed measurands: '%s'", self.id, chgr_csv) + return chgr_csv or "" - chgr_measurands = await self.get_configuration(key) + _LOGGER.debug( + "Returning accepted measurands for '%s': '%s'", self.id, effective_csv + ) + await self.configure(key, effective_csv) + return effective_csv + + # Non-autodetect path: + if desired_csv: + try: + resp = await self.call( + call.ChangeConfiguration(key=key, value=desired_csv) + ) + _LOGGER.debug( + "'%s' measurands set manually to %s", self.id, desired_csv + ) + if getattr(resp, "status", None) in cfg_ok: + effective_csv = desired_csv + else: + _LOGGER.debug( + "'%s' manual measurands set not accepted (status=%s); using charger's value", + self.id, + getattr(resp, "status", None), + ) + effective_csv = await self.get_configuration(key) + except Exception as ex: + _LOGGER.debug( + "Manual measurands set failed for '%s': %s; using charger's value", + self.id, + ex, + ) + effective_csv = await self.get_configuration(key) + else: + effective_csv = await self.get_configuration(key) - if len(accepted_measurands) > 0: - _LOGGER.debug(f"'{self.id}' allowed measurands: '{accepted_measurands}'") - await self.configure(key, accepted_measurands) + if effective_csv: + _LOGGER.debug("'%s' allowed measurands: '%s'", self.id, effective_csv) + # Only configure if we successfully set our desired CSV + if desired_csv and effective_csv == desired_csv: + await self.configure(key, effective_csv) else: - _LOGGER.debug(f"'{self.id}' measurands not configurable by integration") - _LOGGER.debug(f"'{self.id}' allowed measurands: '{chgr_measurands}'") + _LOGGER.debug("'%s' measurands not configurable by integration", self.id) - return accepted_measurands + return effective_csv or "" async def set_standard_configuration(self): """Send configuration values to the charger.""" @@ -264,29 +309,36 @@ async def trigger_boot_notification(self): async def trigger_status_notification(self): """Trigger status notifications for all connectors.""" - return_value = True try: - nof_connectors = int(self._metrics[0][cdet.connectors.value].value or 1) + n = int(self._metrics[0][cdet.connectors.value].value or 1) except Exception: - nof_connectors = 1 - for cid in range(0, nof_connectors + 1): - _LOGGER.debug(f"trigger status notification for connector={cid}") - req = call.TriggerMessage( - requested_message=MessageTrigger.status_notification, - connector_id=int(cid), - ) - resp = await self.call(req) - if resp.status != TriggerMessageStatus.accepted: - _LOGGER.warning("Failed with response: %s", resp.status) - _LOGGER.warning( - "Forcing number of connectors to %d, charger returned %d", - cid - 1, - nof_connectors, + n = 1 + + # Single connector: only probe 1. Multi: probe 0 then 1..n. + attempts = [1] if n <= 1 else [0] + list(range(1, n + 1)) + + for cid in attempts: + _LOGGER.debug("trigger status notification for connector=%s", cid) + try: + req = call.TriggerMessage( + requested_message=MessageTrigger.status_notification, + connector_id=int(cid), ) - self._metrics[0][cdet.connectors.value].value = max(1, cid - 1) - return_value = cid > 1 - break - return return_value + resp = await self.call(req) + status = getattr(resp, "status", None) + except Exception as ex: + _LOGGER.debug("TriggerMessage failed for connector=%s: %s", cid, ex) + status = None + + if status != TriggerMessageStatus.accepted: + if cid > 0: + _LOGGER.warning("Failed with response: %s", status) + # Reduce to the last known-good connector index. + self._metrics[0][cdet.connectors.value].value = max(1, cid - 1) + return False + # If connector 0 is rejected, continue probing numbered connectors. + + return True async def trigger_custom_message( self, @@ -471,8 +523,11 @@ def _mk_profile(purpose: str, cid: int) -> dict: "SetChargingProfile %s on connector %s raised %s", purpose, cid, ex ) - _LOGGER.warning("SetChargingProfile failed (last status=%s).", last_status) - await self.notify_ha(f"SetChargingProfile failed (last status={last_status}).") + if last_status is not None: + _LOGGER.warning("SetChargingProfile failed (last status=%s).", last_status) + await self.notify_ha( + f"SetChargingProfile failed (last status={last_status})." + ) return False async def set_availability(self, state: bool = True, connector_id: int | None = 0): @@ -732,30 +787,38 @@ async def async_update_device_info_v16(self, boot_info: dict): def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs): """Handle MeterValues (per connector). - - EAIR (Energy.Active.Import.Register) **without** transactionId is treated as main meter, - written to connector 0 (aggregate). - - EAIR **with** transactionId is written to the proper connector (connector_id) and used - to update Energy.Session (kWh). - - Other measurands handled via process_measurands(). + - EAIR **without** transactionId always writes to connector 0, + even if it decreases relative to a previously mirrored value. + - Tx-bound EAIR (with transactionId) writes to the specific connector using a non-decreasing rule, + **except** when a new transaction begins on that connector — then lower values are allowed. + - For single-connector chargers, mirror tx-bound EAIR to connector 0 **only** until a true + main-meter (no txId) value is observed. After that, do not mirror. + - Session energy is computed **only** from tx-bound EAIR, never from main-meter readings. """ transaction_id: int | None = kwargs.get(om.transaction_id.name, None) tx_has_id: bool = transaction_id not in (None, 0) - active_tx_for_conn: int = int(self._active_tx.get(connector_id, 0) or 0) + active_tx_for_conn: int | None = ( + int(self._active_tx.get(connector_id, 0) or 0) or None + ) # If missing meter_start or active_transaction_id try to restore from HA states. If HA # does not have values either, generate new ones. if self._metrics[(connector_id, csess.meter_start.value)].value is None: restored = self.get_ha_metric(csess.meter_start.value, connector_id) - if restored is None: - restored = self._metrics[(connector_id, DEFAULT_MEASURAND)].value - else: + restored_f: float | None + if restored is not None: try: - restored = float(restored) + restored_f = float(restored) except (ValueError, TypeError): - restored = None - if restored is not None: - self._metrics[(connector_id, csess.meter_start.value)].value = restored + restored_f = None + else: + # Fallback: if no txId and connector has a per-connector EAIR stored, use that + restored_f = None + if restored_f is not None: + self._metrics[ + (connector_id, csess.meter_start.value) + ].value = restored_f if self._metrics[(connector_id, csess.transaction_id.value)].value is None: restored_tx = self.get_ha_metric(csess.transaction_id.value, connector_id) @@ -766,7 +829,7 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs): except (ValueError, TypeError): candidate = None else: - candidate = transaction_id if tx_has_id else None + candidate = int(transaction_id) if tx_has_id else None if candidate is not None and candidate != 0: self._metrics[ @@ -775,6 +838,23 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs): self._active_tx[connector_id] = candidate active_tx_for_conn = candidate + # --- Detect a new transaction on this connector (for example ABB resets to 0 at Transaction.Begin) --- + new_tx_started = False + if tx_has_id and ( + active_tx_for_conn is None or int(transaction_id) != int(active_tx_for_conn) + ): + # Register the new transaction and clear per-connector EAIR so that a lower starting + # value (e.g., 0.0) is accepted. + self._metrics[(connector_id, csess.transaction_id.value)].value = int( + transaction_id + ) + self._active_tx[connector_id] = int(transaction_id) + active_tx_for_conn = int(transaction_id) + new_tx_started = True + # Reset tx-bound EAIR and session baseline; main meter (connector 0) remains untouched. + self._metrics[(connector_id, DEFAULT_MEASURAND)].value = None + self._metrics[(connector_id, csess.meter_start.value)].value = None + if tx_has_id: transaction_matches = transaction_id == active_tx_for_conn else: @@ -800,50 +880,105 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs): ) meter_values.append(measurands) - # Write main meter value (EAIR) to connector 0 if this message is missing transactionId - if not tx_has_id: - for bucket in meter_values: - for item in bucket: - measurand = item.measurand or DEFAULT_MEASURAND - if measurand == DEFAULT_MEASURAND: - eair_kwh = cp.get_energy_kwh(item) # Wh→kWh if necessary - # Aggregate (connector 0) carries the latest main meter value - self._metrics[(0, DEFAULT_MEASURAND)].value = eair_kwh - self._metrics[(0, DEFAULT_MEASURAND)].unit = HA_ENERGY_UNIT - if item.location is not None: - self._metrics[(0, DEFAULT_MEASURAND)].extra_attr[ - om.location.value - ] = item.location - if item.context is not None: - self._metrics[(0, DEFAULT_MEASURAND)].extra_attr[ - om.context.value - ] = item.context - else: - # Update EAIR on the specific connector during active transactions as well - for bucket in meter_values: - for item in bucket: - measurand = item.measurand or DEFAULT_MEASURAND - if measurand == DEFAULT_MEASURAND: - eair_kwh = cp.get_energy_kwh(item) - self._metrics[ - (connector_id, DEFAULT_MEASURAND) - ].value = eair_kwh - self._metrics[ - (connector_id, DEFAULT_MEASURAND) - ].unit = HA_ENERGY_UNIT - if item.location is not None: - self._metrics[(connector_id, DEFAULT_MEASURAND)].extra_attr[ - om.location.value - ] = item.location - if item.context is not None: - self._metrics[(connector_id, DEFAULT_MEASURAND)].extra_attr[ - om.context.value - ] = item.context - - self.process_measurands(meter_values, transaction_matches, connector_id) - - # Update session time if ongoing transaction - if active_tx_for_conn not in (None, 0): + # --- Helper to rank contexts when multiple EAIR candidates exist in a bucket --- + def _ctx_priority(ctx: str | None) -> int: + if ctx == "Transaction.End": + return 3 + if ctx == "Sample.Periodic": + return 2 + if ctx == "Sample.Clock": + return 1 + return 0 + + # --- Step 1: Apply EAIR --- + # target_cid = 0 (main meter) if no transactionId, else the connector itself + target_cid = 0 if not tx_has_id else connector_id + + for bucket in meter_values: + best_pr, best_val_kwh, best_item = -1, None, None + for item in bucket: + measurand = item.measurand or DEFAULT_MEASURAND + if measurand != DEFAULT_MEASURAND: + continue + # Ignore Transaction.Begin EAIR (often 0 right at start); ABB will be handled by new_tx_started + if item.context == "Transaction.Begin": + continue + try: + val_kwh = float(cp.get_energy_kwh(item)) + except Exception: + continue + if val_kwh < 0.0 or (val_kwh != val_kwh): + continue + pr = _ctx_priority(item.context) + if (pr > best_pr) or ( + pr == best_pr and (best_val_kwh is None or val_kwh > best_val_kwh) + ): + best_pr, best_val_kwh, best_item = pr, val_kwh, item + + if best_item is None: + continue + + if not tx_has_id and target_cid == 0: + # Authoritative main meter: always write (can decrease vs a mirrored value) + m = self._metrics[(0, DEFAULT_MEASURAND)] + m.value = best_val_kwh + m.unit = HA_ENERGY_UNIT + m.extra_attr["source"] = "main" + if best_item.context is not None: + m.extra_attr[om.context.value] = best_item.context + if best_item.location is not None: + m.extra_attr[om.location.value] = best_item.location + else: + # Tx-bound EAIR: write non-decreasing, unless a new transaction just began + m = self._metrics[(target_cid, DEFAULT_MEASURAND)] + prev = m.value + allow = new_tx_started or (prev is None or best_val_kwh >= float(prev)) + if allow: + m.value = best_val_kwh + m.unit = HA_ENERGY_UNIT + if best_item.context is not None: + m.extra_attr[om.context.value] = best_item.context + if best_item.location is not None: + m.extra_attr[om.location.value] = best_item.location + + # Mirror to connector 0 only for single-connector chargers, and only + # until we've observed an authoritative main meter. + try: + n_connectors = int(getattr(self, "num_connectors", 1) or 1) + except Exception: + n_connectors = 1 + if n_connectors == 1: + mm = self._metrics[(0, DEFAULT_MEASURAND)] + main_seen = mm.extra_attr.get("source") == "main" + if not main_seen: + prev_main = mm.value + allow_main = new_tx_started or ( + prev_main is None or best_val_kwh >= float(prev_main) + ) + if allow_main: + mm.value = best_val_kwh + mm.unit = HA_ENERGY_UNIT + mm.extra_attr["source"] = "mirrored_tx" + if best_item.context is not None: + mm.extra_attr[om.context.value] = best_item.context + if best_item.location is not None: + mm.extra_attr[om.location.value] = best_item.location + + # --- Step 2: Process non-EAIR measurands via existing pipeline --- + mv_wo_eair: list[list[MeasurandValue]] = [] + for bucket in meter_values: + filtered = [ + it + for it in bucket + if (it.measurand or DEFAULT_MEASURAND) != DEFAULT_MEASURAND + ] + if filtered: + mv_wo_eair.append(filtered) + self.process_measurands(mv_wo_eair, transaction_matches, connector_id) + + # --- Step 3: Update session metrics (time, energy) only for tx-bound EAIR --- + if tx_has_id and transaction_matches: + # Session time (minutes) — keep parity with previous behavior tx_start = float( self._metrics[(connector_id, csess.transaction_id.value)].value or time.time() @@ -855,23 +990,48 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs): (connector_id, csess.session_time.value) ].unit = UnitOfTime.MINUTES - # Update Energy.Session ONLY from EAIR in this message if txId exists and matches - if tx_has_id and transaction_matches: + # Session energy from tx-bound EAIR only eair_kwh_in_msg: float | None = None + best_ctx_prio = -1 for bucket in meter_values: for item in bucket: measurand = item.measurand or DEFAULT_MEASURAND - if measurand == DEFAULT_MEASURAND: - eair_kwh_in_msg = cp.get_energy_kwh(item) + if measurand != DEFAULT_MEASURAND: + continue + if item.context == "Transaction.Begin": + continue + try: + val_kwh = float(cp.get_energy_kwh(item)) + except Exception: + continue + if val_kwh < 0.0 or (val_kwh != val_kwh): + continue + pr = _ctx_priority(item.context) + if (pr > best_ctx_prio) or ( + pr == best_ctx_prio + and (eair_kwh_in_msg is None or val_kwh > eair_kwh_in_msg) + ): + best_ctx_prio = pr + eair_kwh_in_msg = val_kwh + if eair_kwh_in_msg is not None: + raw_start = self._metrics[(connector_id, csess.meter_start.value)].value try: - meter_start_kwh = float( - self._metrics[(connector_id, csess.meter_start.value)].value - or 0.0 + meter_start_kwh = ( + float(raw_start) if raw_start is not None else None ) except Exception: - meter_start_kwh = 0.0 - session_kwh = max(0.0, eair_kwh_in_msg - meter_start_kwh) + meter_start_kwh = None + + if meter_start_kwh is None: + # Initialize at first tx-bound EAIR; ABB starts at 0 which is desired here + self._metrics[ + (connector_id, csess.meter_start.value) + ].value = eair_kwh_in_msg + session_kwh = 0.0 + else: + session_kwh = max(0.0, eair_kwh_in_msg - meter_start_kwh) + self._metrics[ (connector_id, csess.session_energy.value) ].value = session_kwh diff --git a/tests/conftest.py b/tests/conftest.py index f6843c20..f0c15dfb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,20 @@ """Global fixtures for ocpp integration.""" import asyncio +from collections.abc import AsyncGenerator from unittest.mock import patch - +from pytest_homeassistant_custom_component.common import MockConfigEntry import pytest import websockets +from custom_components.ocpp.api import CentralSystem +from custom_components.ocpp.const import CONF_CPIDS, CONF_PORT, DOMAIN as OCPP_DOMAIN +from tests.const import MOCK_CONFIG_CP_APPEND, MOCK_CONFIG_DATA +from .charge_point_test import ( + create_configuration, + remove_configuration, +) + pytest_plugins = "pytest_homeassistant_custom_component" @@ -56,3 +65,26 @@ def error_get_data_fixture(): # side_effect=Exception, # ): yield + + +@pytest.fixture +async def setup_config_entry(hass, request) -> AsyncGenerator[CentralSystem, None]: + """Setup/teardown mock config entry and central system.""" + # Create a mock entry so we don't have to go through config flow + # Both version and minor need to match config flow so as not to trigger migration flow + config_data = MOCK_CONFIG_DATA.copy() + config_data[CONF_CPIDS].append( + {request.param["cp_id"]: MOCK_CONFIG_CP_APPEND.copy()} + ) + config_data[CONF_PORT] = request.param["port"] + config_entry = MockConfigEntry( + domain=OCPP_DOMAIN, + data=config_data, + entry_id=request.param["cms"], + title=request.param["cms"], + version=2, + minor_version=0, + ) + yield await create_configuration(hass, config_entry) + # tear down + await remove_configuration(hass, config_entry) diff --git a/tests/test_additional_charge_point_v16.py b/tests/test_additional_charge_point_v16.py new file mode 100644 index 00000000..92bb70f2 --- /dev/null +++ b/tests/test_additional_charge_point_v16.py @@ -0,0 +1,1196 @@ +"""Test additional v16 paths.""" + +import asyncio +import contextlib +from datetime import datetime, UTC +from types import SimpleNamespace + +import pytest +import websockets + +from ocpp.v16 import call +from ocpp.v16.enums import ( + TriggerMessageStatus, + ChargePointStatus, + ConfigurationStatus, +) + +from custom_components.ocpp.enums import ( + HAChargerDetails as cdet, + ConfigurationKey as ckey, +) + +from .test_charge_point_v16 import wait_ready, ChargePoint + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9116, "cp_id": "CP_trig_timeout_nonzero_adjusts", "cms": "cms_trig_tnz"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_timeout_nonzero_adjusts"]) +@pytest.mark.parametrize("port", [9116]) +async def test_trigger_status_timeout_on_nonzero_adjusts_and_stops( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test trigger status timeout on nonzero adjusts and stops.""" + cs = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 2 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + if req.connector_id == 2: + raise TimeoutError("simulated") + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is False + # Should stop after the failing connector + assert attempts == [0, 1, 2] + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9310, "cp_id": "CP_cov_conn_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_conn_exc"]) +@pytest.mark.parametrize("port", [9310]) +async def test_get_number_of_connectors_exception_defaults( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test get number of connectors when exception defaults to 1.""" + cs = setup_config_entry + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + async def fake_call(req): + # Simulate failure when requesting NumberOfConnectors + if isinstance(req, call.GetConfiguration): + raise TypeError("boom") + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + n = await srv.get_number_of_connectors() + assert n == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9311, "cp_id": "CP_cov_conn_bad", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_conn_bad"]) +@pytest.mark.parametrize("port", [9311]) +async def test_get_number_of_connectors_invalid_value_defaults( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test get number of connectors with invalid value defaults to 1.""" + cs = setup_config_entry + + class FakeResp: + def __init__(self): + self.configuration_key = [{"key": "NumberOfConnectors", "value": "n/a"}] + self.unknown_key = None + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + async def fake_call(req): + if isinstance(req, call.GetConfiguration): + return FakeResp() + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + n = await srv.get_number_of_connectors() + assert n == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9312, "cp_id": "CP_cov_auto_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_auto_exc"]) +@pytest.mark.parametrize("port", [9312]) +async def test_autodetect_measurands_change_configuration_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test autodetect measurands when ChangeConfiguration raises and fallback occurs.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Enable autodetect and set a desired CSV to trigger the path + srv.settings.monitored_variables_autoconfig = True + srv.settings.monitored_variables = "Power.Active.Import,Voltage" + + async def fake_call(req): + # Fail on ChangeConfiguration, so code reads back via GetConfiguration + if isinstance(req, call.ChangeConfiguration): + raise TypeError("set failed") + if isinstance(req, call.GetConfiguration): + return SimpleNamespace( + configuration_key=[{"value": "Voltage"}], unknown_key=None + ) + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + result = await srv.get_supported_measurands() + assert result == "Voltage" + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9313, "cp_id": "CP_cov_manual_ok", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_manual_ok"]) +@pytest.mark.parametrize("port", [9313]) +async def test_measurands_manual_set_accepted_configures( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test manual measurands set accepted and configure is called.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Disable autodetect; set a desired CSV + srv.settings.monitored_variables_autoconfig = False + srv.settings.monitored_variables = "Energy.Active.Import.Register,Voltage" + + called_configure = [] + + async def fake_call(req): + if isinstance(req, call.ChangeConfiguration): + return SimpleNamespace(status=ConfigurationStatus.accepted) + return SimpleNamespace() + + async def fake_configure(key, value): + called_configure.append((key, value)) + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + monkeypatch.setattr(srv, "configure", fake_configure, raising=True) + + result = await srv.get_supported_measurands() + assert result == srv.settings.monitored_variables + # configure() should have been called with the accepted CSV + assert ( + called_configure + and called_configure[0][0] == ckey.meter_values_sampled_data.value + ) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9314, "cp_id": "CP_cov_manual_rej", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_manual_rej"]) +@pytest.mark.parametrize("port", [9314]) +async def test_measurands_manual_set_rejected_returns_empty( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test manual measurands rejected and fallback returns empty string.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + srv.settings.monitored_variables_autoconfig = False + srv.settings.monitored_variables = "Energy.Active.Import.Register" + + async def fake_call(req): + if isinstance(req, call.ChangeConfiguration): + return SimpleNamespace(status=ConfigurationStatus.rejected) + if isinstance(req, call.GetConfiguration): + # Simulate charger returning no value for the requested key + # by providing an empty configuration_key list (attribute present). + return SimpleNamespace(configuration_key=[], unknown_key=None) + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + result = await srv.get_supported_measurands() + assert result == "" # effective_csv was empty + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9315, "cp_id": "CP_cov_trig_bn", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_trig_bn"]) +@pytest.mark.parametrize("port", [9315]) +async def test_trigger_boot_notification_accepts( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test trigger boot notification accepted path.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + ok = await srv.trigger_boot_notification() + assert ok is True and srv.triggered_boot_notification is True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9316, "cp_id": "CP_cov_trig_stat_n_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_trig_stat_n_exc"]) +@pytest.mark.parametrize("port", [9316]) +async def test_trigger_status_notification_connector_count_parse_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test trigger status notification when connector count parse exception causes n=1.""" + cs = setup_config_entry + attempts = [] + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + # Force parse error so n=1 + srv._metrics[0][cdet.connectors.value].value = "bad" + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + ok = await srv.trigger_status_notification() + assert ok is True + assert attempts == [1] # n<=1 -> probe only connector 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9317, "cp_id": "CP_cov_trig_custom_bad", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_trig_custom_bad"]) +@pytest.mark.parametrize("port", [9317]) +async def test_trigger_custom_message_unsupported_name( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test trigger custom message rejects unsupported trigger names.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + ok = await srv.trigger_custom_message("not_a_trigger") + assert ok is False + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9318, "cp_id": "CP_cov_profile_ids", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_profile_ids"]) +@pytest.mark.parametrize("port", [9318]) +async def test_profile_ids_for_bad_conn_id_cast( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test profile ids path when conn_id cast fails and conn_seg defaults to 1.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + pid, level = srv._profile_ids_for(conn_id="X", purpose="TxDefaultProfile") + # conn_seg should fall back to 1 -> pid = 1000 + 2 + (1*10) = 1012 + assert (pid, level) == (1012, 1) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9319, "cp_id": "CP_cov_stop_tx_early", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_stop_tx_early"]) +@pytest.mark.parametrize("port", [9319]) +async def test_stop_transaction_early_return( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test stop transaction early return when there is no active transaction.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + srv.active_transaction_id = 0 + srv._active_tx.clear() + ok = await srv.stop_transaction() + assert ok is True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9320, "cp_id": "CP_cov_update_fw", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_update_fw"]) +@pytest.mark.parametrize("port", [9320]) +async def test_update_firmware_wait_time_invalid_falls_back( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test update firmware path when wait_time is invalid and falls back to immediate time.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + async def fake_call(req): + return SimpleNamespace() # success path + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + ok = await srv.update_firmware( + "https://example.com/fw.bin", wait_time="not-int" + ) + assert ok is True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9321, "cp_id": "CP_cov_getcfg_empty", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_getcfg_empty"]) +@pytest.mark.parametrize("port", [9321]) +async def test_get_configuration_empty_key_path( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test get_configuration empty key path uses call without key property.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + async def fake_call(req): + # When key is empty, code constructs call.GetConfiguration() (no key list) + assert isinstance(req, call.GetConfiguration) and not getattr( + req, "key", None + ) + return SimpleNamespace( + configuration_key=[{"value": "42"}], unknown_key=None + ) + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + val = await srv.get_configuration("") + assert val == "42" + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9322, "cp_id": "CP_cov_config_ro", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_config_ro"]) +@pytest.mark.parametrize("port", [9322]) +async def test_configure_readonly_warns_and_notifies( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test configure warns and notifies when key is read-only.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + notified = [] + + async def fake_call(req): + if isinstance(req, call.GetConfiguration): + return SimpleNamespace( + configuration_key=[ + {"key": "Foo", "value": "Bar", "readonly": True} + ], + unknown_key=None, + ) + # ChangeConfiguration may still be issued; return accepted for completeness + if isinstance(req, call.ChangeConfiguration): + return SimpleNamespace(status=ConfigurationStatus.accepted) + return SimpleNamespace() + + async def fake_notify(msg): + notified.append(msg) + + monkeypatch.setattr(srv, "call", fake_call, raising=True) + monkeypatch.setattr(srv, "notify_ha", fake_notify, raising=True) + await srv.configure("Foo", "Baz") + # A warning/notification should be pushed for read-only + assert any("read-only" in m for m in notified) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9323, "cp_id": "CP_cov_restore_ms", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_restore_ms"]) +@pytest.mark.parametrize("port", [9323]) +async def test_restore_meter_start_cast_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test restore meter start from HA when cast raises and remains None.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + # Force metric slot to look missing + srv._metrics[(1, "Energy.Meter.Start")].value = None + + def fake_get_ha_metric(name, connector_id=None): + if name == "Energy.Meter.Start" and connector_id == 1: + return "not-a-float" + return None + + monkeypatch.setattr(srv, "get_ha_metric", fake_get_ha_metric, raising=True) + + # Send a MeterValues WITHOUT transactionId to trigger restore branch + mv_no_tx = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "15000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Inlet", + "context": "Sample.Clock", + } + ], + } + ], + ) + # Open a WS client to deliver this message + resp = await cp.call(mv_no_tx) + assert resp is not None + assert srv._metrics[(1, "Energy.Meter.Start")].value is None + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9324, "cp_id": "CP_cov_restore_tx", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_restore_tx"]) +@pytest.mark.parametrize("port", [9324]) +async def test_restore_transaction_id_cast_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test restore transaction id when cast fails leaving candidate as None.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + srv._metrics[(1, "Transaction.Id")].value = None + + def fake_get_ha_metric(name, connector_id=None): + if name == "Transaction.Id" and connector_id == 1: + return "not-an-int" + return None + + monkeypatch.setattr(srv, "get_ha_metric", fake_get_ha_metric, raising=True) + + # Trigger the handler with a MeterValues (no strict need to carry tx) + mv_no_tx = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "1", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Inlet", + "context": "Sample.Clock", + } + ], + } + ], + ) + _ = await cp.call(mv_no_tx) + # Value remains unset because candidate was None + assert srv._metrics[(1, "Transaction.Id")].value is None + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9325, "cp_id": "CP_cov_new_tx", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_new_tx"]) +@pytest.mark.parametrize("port", [9325]) +async def test_new_transaction_resets_tx_bound_metrics( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test new transaction detection resets tx-bound EAIR and meter_start.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + # Preload per-connector values which should be cleared when new tx starts + srv._metrics[(1, "Energy.Active.Import.Register")].value = 999.0 + srv._metrics[(1, "Energy.Meter.Start")].value = 888.0 + + # Send MeterValues with a new transactionId + mv_tx = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "0", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "context": "Sample.Clock", + "location": "Inlet", + } + ], + } + ], + transaction_id=12345, + ) + resp = await cp.call(mv_tx) + assert resp is not None + # New tx should be registered + assert srv._active_tx.get(1) == 12345 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9326, "cp_id": "CP_cov_eair_cast_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_eair_cast_exc"]) +@pytest.mark.parametrize("port", [9326]) +async def test_eair_get_energy_kwh_exception_ignored( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test EAIR scan ignores entries when energy_kwh conversion raises.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + # Monkeypatch energy conversion to raise + from custom_components.ocpp.ocppv16 import cp as cp_mod + + monkeypatch.setattr( + cp_mod, + "get_energy_kwh", + lambda item: (_ for _ in ()).throw(RuntimeError("bad")), + ) + + mv = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "1000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "context": "Sample.Clock", + } + ], + } + ], + transaction_id=1, + ) + # Should not raise + _ = await cp.call(mv) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9327, "cp_id": "CP_cov_num_conn_int_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_num_conn_int_exc"]) +@pytest.mark.parametrize("port", [9327]) +async def test_mirror_single_connector_handles_int_exception( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test single connector mirroring handles int casting exception by falling back.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + class Bad: + def __int__(self): + raise ValueError("no int") + + srv.num_connectors = Bad() + + mv = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "1000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "context": "Sample.Clock", + } + ], + } + ], + transaction_id=2, + ) + _ = await cp.call(mv) + # If we reach here without exceptions, fallback worked (n_connectors=1) + assert True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9328, "cp_id": "CP_cov_sess_energy_cast_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_sess_energy_cast_exc"]) +@pytest.mark.parametrize("port", [9328]) +async def test_session_energy_get_energy_kwh_exception_ignored( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test session energy calculation ignores EAIR entries raising conversion errors.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + from custom_components.ocpp.ocppv16 import cp as cp_mod + + monkeypatch.setattr( + cp_mod, + "get_energy_kwh", + lambda item: (_ for _ in ()).throw(RuntimeError("bad")), + ) + + mv = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "1000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "context": "Sample.Clock", + } + ], + } + ], + transaction_id=3, + ) + _ = await cp.call(mv) + # No crash == lines 1005-1006 exercised + assert True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9329, "cp_id": "CP_cov_sess_ms_cast_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_sess_ms_cast_exc"]) +@pytest.mark.parametrize("port", [9329]) +async def test_session_energy_meter_start_cast_exception( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test session energy path when meter_start cannot be cast to float.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + # Poison meter_start with a non-float so that float() raises + srv._metrics[(1, "Energy.Meter.Start")].value = object() + + mv = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "1000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "context": "Sample.Clock", + } + ], + } + ], + transaction_id=4, + ) + _ = await cp.call(mv) + # No crash is sufficient to cover lines 1023-1024 + assert True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9330, "cp_id": "CP_cov_status_suspended", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_status_suspended"]) +@pytest.mark.parametrize("port", [9330]) +async def test_status_notification_suspended_resets_metrics( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test status notification suspended state resets power/current metrics to zero.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Pre-populate metrics (non-zero) so they can be zeroed on suspended status + for meas in [ + "Current.Import", + "Power.Active.Import", + "Power.Reactive.Import", + "Current.Export", + "Power.Active.Export", + "Power.Reactive.Export", + ]: + srv._metrics[(1, meas)].value = 123 + + # Simulate status notification + resp = srv.on_status_notification( + connector_id=1, + error_code="NoError", + status=ChargePointStatus.suspended_ev.value, + ) + assert resp is not None + for meas in [ + "Current.Import", + "Power.Active.Import", + "Power.Reactive.Import", + "Current.Export", + "Power.Active.Export", + "Power.Reactive.Export", + ]: + assert int(srv._metrics[(1, meas)].value) == 0 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9331, "cp_id": "CP_cov_start_tx_ms_cast_exc", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_start_tx_ms_cast_exc"]) +@pytest.mark.parametrize("port", [9331]) +async def test_start_transaction_meter_start_cast_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test start transaction handler path when meter_start cast fails defaults to 0.0.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Ensure authorization passes so the handler proceeds normally + monkeypatch.setattr( + srv, "get_authorization_status", lambda id_tag: "Accepted", raising=True + ) + + # Call the handler directly with a non-numeric meter_start + result = srv.on_start_transaction( + connector_id=1, id_tag="test_cp", meter_start="not-a-number" + ) + assert result is not None + # The cast fails internally and baseline defaults to 0.0 (lines 1147-1148) + assert float(srv._metrics[(1, "Energy.Meter.Start")].value) == 0.0 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9332, "cp_id": "CP_cov_start_tx_denied", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_start_tx_denied"]) +@pytest.mark.parametrize("port", [9332]) +async def test_start_transaction_auth_denied_returns_tx0( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Test start transaction returns transaction id 0 when authorization is denied.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Force non-accepted authorization + monkeypatch.setattr( + srv, + "get_authorization_status", + lambda id_tag: "Invalid", + raising=True, + ) + # Call handler directly to inspect response + result = srv.on_start_transaction( + connector_id=1, id_tag="bad", meter_start=0 + ) + assert result.transaction_id == 0 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9333, "cp_id": "CP_cov_stop_tx_paths", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_stop_tx_paths"]) +@pytest.mark.parametrize("port", [9333]) +async def test_stop_transaction_misc_paths( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test stop transaction paths covering unknown unit and meter_stop cast exception.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Simulate a running transaction + srv._active_tx[1] = 777 + srv.active_transaction_id = 777 + + # Prepare main-meter EAIR with unknown unit to drive line 1213 + srv._metrics[(1, "Energy.Active.Import.Register")].value = 123.456 + srv._metrics[(1, "Energy.Active.Import.Register")].unit = "kWs" # unknown + + # Call stop_transaction with a non-numeric meter_stop so 1225-1226 -> 0.0 + result = srv.on_stop_transaction( + meter_stop="not-a-number", timestamp=None, transaction_id=777 + ) + assert result is not None + # Session energy should be computed; we mainly care lines executed without crash + assert srv._active_tx[1] == 0 and srv.active_transaction_id == 0 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() diff --git a/tests/test_api_paths.py b/tests/test_api_paths.py index 3e32d4da..327d3426 100644 --- a/tests/test_api_paths.py +++ b/tests/test_api_paths.py @@ -19,7 +19,7 @@ from custom_components.ocpp.chargepoint import Metric as M from custom_components.ocpp.chargepoint import SetVariableResult -from .test_charge_point_v16 import MOCK_CONFIG_DATA +from tests.const import MOCK_CONFIG_DATA class DummyCP: diff --git a/tests/test_charge_point_v16.py b/tests/test_charge_point_v16.py index 9ad92715..667e8a00 100644 --- a/tests/test_charge_point_v16.py +++ b/tests/test_charge_point_v16.py @@ -10,7 +10,6 @@ from types import SimpleNamespace import pytest -from pytest_homeassistant_custom_component.common import MockConfigEntry from homeassistant.exceptions import HomeAssistantError import websockets @@ -22,13 +21,13 @@ CONF_CPIDS, CONF_CPID, CONF_NUM_CONNECTORS, - CONF_PORT, DEFAULT_ENERGY_UNIT, DEFAULT_MEASURAND, HA_ENERGY_UNIT, ) from custom_components.ocpp.enums import ( ConfigurationKey, + HAChargerDetails as cdet, HAChargerServices as csvcs, HAChargerStatuses as cstat, HAChargerSession as csess, @@ -62,15 +61,12 @@ ) from .const import ( - MOCK_CONFIG_DATA, MOCK_CONFIG_CP_APPEND, ) from .charge_point_test import ( set_switch, press_button, set_number, - create_configuration, - remove_configuration, wait_ready, ) @@ -233,29 +229,6 @@ async def test_services(hass, cpid, serv_list, socket_enabled): test_services.__test__ = False -@pytest.fixture -async def setup_config_entry(hass, request) -> CentralSystem: - """Setup/teardown mock config entry and central system.""" - # Create a mock entry so we don't have to go through config flow - # Both version and minor need to match config flow so as not to trigger migration flow - config_data = MOCK_CONFIG_DATA.copy() - config_data[CONF_CPIDS].append( - {request.param["cp_id"]: MOCK_CONFIG_CP_APPEND.copy()} - ) - config_data[CONF_PORT] = request.param["port"] - config_entry = MockConfigEntry( - domain=OCPP_DOMAIN, - data=config_data, - entry_id=request.param["cms"], - title=request.param["cms"], - version=2, - minor_version=0, - ) - yield await create_configuration(hass, config_entry) - # tear down - await remove_configuration(hass, config_entry) - - # @pytest.mark.skip(reason="skip") @pytest.mark.timeout(20) # Set timeout for this test @pytest.mark.parametrize( @@ -2926,7 +2899,7 @@ async def fake_call_error(req): await ws.close() -@pytest.mark.timeout(10) +@pytest.mark.timeout(30) @pytest.mark.parametrize( "setup_config_entry", [{"port": 9090, "cp_id": "CP_avail3", "cms": "cms_avail3"}], @@ -3152,7 +3125,7 @@ async def fake_notify(msg: str, title: str = "Ocpp integration"): await ws.close() -@pytest.mark.timeout(10) +@pytest.mark.timeout(20) @pytest.mark.parametrize( "setup_config_entry", [{"port": 9094, "cp_id": "CP_setrate_2", "cms": "cms_setrate_2"}], @@ -3236,10 +3209,7 @@ async def fake_notify(msg: str, title: str = "Ocpp integration"): async def test_set_charge_rate_set_call_raises_for_all_attempts( hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch ): - """SetChargingProfile raises for all attempts -> function should catch and continue. - - After all attempts fail, returns False and notify_ha(last_status=None). - """ + """SetChargingProfile raises for all attempts -> function swallows errors, returns False, and does not notify HA.""" cs: CentralSystem = setup_config_entry async with websockets.connect( f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] @@ -3271,18 +3241,18 @@ async def fake_call(req): monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) - captured = {"msg": None} + notify_calls = {"n": 0} async def fake_notify(msg: str, title: str = "Ocpp integration"): - captured["msg"] = msg + notify_calls["n"] += 1 return True monkeypatch.setattr(srv_cp, "notify_ha", fake_notify, raising=True) ok = await srv_cp.set_charge_rate(limit_amps=6, conn_id=1) assert ok is False - # last_status stays None because we never got a resp to read .status from - assert "last status=None" in (captured["msg"] or "") + # No user-facing notification on periodic/internal failure + assert notify_calls["n"] == 0 finally: cp_task.cancel() @@ -3342,6 +3312,1451 @@ async def fake_call(req): await ws.close() +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9097, "cp_id": "CP_eair_no_tx", "cms": "cms_eair_no_tx"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_eair_no_tx"]) +@pytest.mark.parametrize("port", [9097]) +async def test_on_meter_values_no_tx_aggregate_ignores_begin_and_converts_wh( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test that Transaction.Begin is ignored when Periodic also in the same message.""" + + cs: CentralSystem = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + # Both Periodic (4369 Wh) and Begin (0) in the same bucket + req = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "Wh", + "value": "4369", + }, + { + "measurand": "Energy.Active.Import.Register", + "context": "Transaction.Begin", + "unit": "Wh", + "value": "0", + }, + ], + } + ], + ) + await cp.call(req) + + val = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=0) + assert val == pytest.approx(4.369, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9098, "cp_id": "CP_eair_tx", "cms": "cms_eair_tx"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_eair_tx"]) +@pytest.mark.parametrize("port", [9098]) +async def test_on_meter_values_tx_updates_connector_and_session_energy( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test that values with txId writes to connector and updates Energy.Session from the best EAIR.""" + + cs: CentralSystem = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + req1 = call.MeterValues( + connector_id=1, + transaction_id=111, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "Wh", + "value": "1000", + } + ], + } + ], + ) + await cp.call(req1) + + v1 = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=1) + s1 = cs.get_metric(cp_id, "Energy.Session", connector_id=1) + assert v1 == pytest.approx(1.0, rel=1e-6) + assert s1 == pytest.approx(0.0, rel=1e-6) + + req2 = call.MeterValues( + connector_id=1, + transaction_id=111, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "kWh", + "value": "1.5", + }, + { + "measurand": "Energy.Active.Import.Register", + "context": "Transaction.Begin", + "unit": "Wh", + "value": "0", + }, + ], + } + ], + ) + await cp.call(req2) + + v2 = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=1) + s2 = cs.get_metric(cp_id, "Energy.Session", connector_id=1) + assert v2 == pytest.approx(1.5, rel=1e-6) + assert s2 == pytest.approx(0.5, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9099, "cp_id": "CP_eair_prio", "cms": "cms_eair_prio"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_eair_prio"]) +@pytest.mark.parametrize("port", [9099]) +async def test_on_meter_values_priority_end_over_periodic( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test that Transaction.End wins over Sample.Periodic (no matter the order).""" + + cs: CentralSystem = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + req = call.MeterValues( + connector_id=2, + transaction_id=222, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "kWh", + "value": "10.0", + }, + { + "measurand": "Energy.Active.Import.Register", + "context": "Transaction.End", + "unit": "kWh", + "value": "10.5", + }, + ], + } + ], + ) + await cp.call(req) + + v = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=2) + assert v == pytest.approx(10.5, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9100, "cp_id": "CP_eair_sanitize", "cms": "cms_eair_sanitize"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_eair_sanitize"]) +@pytest.mark.parametrize("port", [9100]) +async def test_on_meter_values_sanitizes_and_ignores_exceptions( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """NaN & negatives ignored; get_energy_kwh exception ignored; Periodic finally wins.""" + + cs: CentralSystem = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from custom_components.ocpp import ocppv16 as mod_v16 + + orig_get_e_kwh = mod_v16.cp.get_energy_kwh + + def flaky_get_energy_kwh(item): + try: + if ( + getattr(item, "context", None) == "Sample.Clock" + and getattr(item, "unit", None) in ("kWh", "Wh") + and float(getattr(item, "value", -1)) == 1234.0 + ): + raise ValueError("simulated conversion error") + except Exception: + pass + return orig_get_e_kwh(item) + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + monkeypatch.setattr( + mod_v16.cp, "get_energy_kwh", flaky_get_energy_kwh, raising=True + ) + + req = call.MeterValues( + connector_id=1, + transaction_id=333, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + # 1) NaN -> should be ignored + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Clock", + "unit": "kWh", + "value": "NaN", + }, + # 2) Negativt -> should be ignored + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Clock", + "unit": "kWh", + "value": "-1", + }, + # 3) Valid, but the patch will let get_energy_kwh throw -> ignored by except + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Clock", + "unit": "kWh", + "value": "1234", + }, + # 4) Finally a good Periodic that is chosen + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "kWh", + "value": "3.2", + }, + ], + } + ], + ) + await cp.call(req) + + v = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=1) + s = cs.get_metric(cp_id, "Energy.Session", connector_id=1) + assert v == pytest.approx(3.2, rel=1e-6) + assert s == pytest.approx(0.0, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + # Restore the patch + with contextlib.suppress(Exception): + monkeypatch.setattr( + mod_v16.cp, "get_energy_kwh", orig_get_e_kwh, raising=True + ) + + +@pytest.mark.timeout(20) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9101, "cp_id": "CP_eair_prio_vs_value", "cms": "cms_eair_prio_vs_value"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_eair_prio_vs_value"]) +@pytest.mark.parametrize("port", [9101]) +async def test_on_meter_values_priority_beats_raw_value( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Test that prio beats raw value.""" + + cs: CentralSystem = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + # "Other" context has prio 0; Periodic has prio 2 -> Periodic should be picked even if the value is lower. + req = call.MeterValues( + connector_id=1, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "measurand": "Energy.Active.Import.Register", + "context": "Other", + "unit": "kWh", + "value": "2.0", + }, + { + "measurand": "Energy.Active.Import.Register", + "context": "Sample.Periodic", + "unit": "kWh", + "value": "1.0", + }, + ], + } + ], + ) + await cp.call(req) + + v = cs.get_metric(cp_id, "Energy.Active.Import.Register", connector_id=0) + assert v == pytest.approx(1.0, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9111, "cp_id": "CP_trig_single_ok", "cms": "cms_trig_single_ok"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_single_ok"]) +@pytest.mark.parametrize("port", [9111]) +async def test_trigger_status_single_accepts( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=1: should NOT try connectorId=0; only cid=1; accepted -> True.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + # force single connector + srv_cp._metrics[0][cdet.connectors.value].value = 1 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is True + assert attempts == [1] + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9112, "cp_id": "CP_trig_multi_ok", "cms": "cms_trig_multi_ok"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_multi_ok"]) +@pytest.mark.parametrize("port", [9112]) +async def test_trigger_status_multi_all_accepts( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=2: multi-connector happy path. Should return True and not reduce connector count.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 2 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is True + assert attempts == [0, 1, 2] + assert srv_cp._metrics[0][cdet.connectors.value].value == 2 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9113, "cp_id": "CP_trig_reject_zero_continue", "cms": "cms_trig_r0"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_reject_zero_continue"]) +@pytest.mark.parametrize("port", [9113]) +async def test_trigger_status_reject_zero_but_accept_rest( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=2: cid=0 -> Rejected (ignored), 1 & 2 accepted -> True; connector count unchanged.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 2 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + if req.connector_id == 0: + return SimpleNamespace(status="Rejected") + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is True + assert attempts == [0, 1, 2] + # should not downgrade connector count because only cid=0 rejected + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 2 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9114, "cp_id": "CP_trig_reject_nonzero_adjusts", "cms": "cms_trig_rnz"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_reject_nonzero_adjusts"]) +@pytest.mark.parametrize("port", [9114]) +async def test_trigger_status_reject_nonzero_adjusts_and_stops( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=3: 0 & 1 accepted; 2 rejected -> set connectors to 1 (cid-1), return False, stop before 3.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 3 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + if req.connector_id == 2: + return SimpleNamespace(status="Rejected") + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is False + assert attempts == [0, 1, 2] + # reduced to cid-1 => 1 + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9115, "cp_id": "CP_trig_timeout_zero_continue", "cms": "cms_trig_t0"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_timeout_zero_continue"]) +@pytest.mark.parametrize("port", [9115]) +async def test_trigger_status_timeout_on_zero_continues( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=2: cid=0 raises TimeoutError (ignored), others accepted -> True; count unchanged.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 2 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + if req.connector_id == 0: + raise TimeoutError("simulated") + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is True + assert attempts == [0, 1, 2] + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 2 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9116, "cp_id": "CP_trig_timeout_nonzero_adjusts", "cms": "cms_trig_tnz"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_trig_timeout_nonzero_adjusts"]) +@pytest.mark.parametrize("port", [9116]) +async def test_trigger_status_timeout_on_nonzero_adjusts_and_stops( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """n=2: cid=2 raises TimeoutError -> set connectors to 1, return False, stop.""" + cs: CentralSystem = setup_config_entry + attempts = [] + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + await cp.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + + srv_cp = cs.charge_points[cp_id] + srv_cp._metrics[0][cdet.connectors.value].value = 2 + + async def fake_call(req): + if isinstance(req, call.TriggerMessage): + attempts.append(req.connector_id) + if req.connector_id == 2: + raise TimeoutError("simulated") + return SimpleNamespace(status=TriggerMessageStatus.accepted) + return SimpleNamespace() + + monkeypatch.setattr(srv_cp, "call", fake_call, raising=True) + + ok = await srv_cp.trigger_status_notification() + assert ok is False + # Should stop after the failing connector + assert attempts == [0, 1, 2] + assert int(srv_cp._metrics[0][cdet.connectors.value].value) == 1 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(30) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9120, "cp_id": "CP_postconn_ex_1", "cms": "cms_postconn_ex_1"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_1"]) +@pytest.mark.parametrize("port", [9120]) +async def test_post_connect_fetch_supported_features_raises( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """fetch_supported_features raises inside post_connect -> swallowed; post_connect_success stays False.""" + cs: CentralSystem = setup_config_entry + + # Patch before connecting so our call to post_connect() hits the boom. + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def boom(self): + raise RuntimeError("fetch boom") + + monkeypatch.setattr(ServerCP, "fetch_supported_features", boom, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + # client test CP + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + # Vänta bara tills servern registrerat CP-objektet + # (ingen BootNotification -> ingen auto post_connect) + await asyncio.sleep(0.05) + srv_cp = cs.charge_points[cp_id] + + # Säkerställ definierat initialt värde + setattr(srv_cp, "post_connect_success", False) + + # Kör post_connect() – ska svälja exception och inte sätta success=True + await srv_cp.post_connect() + + assert getattr(srv_cp, "post_connect_success", False) is not True + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9121, "cp_id": "CP_postconn_ex_2", "cms": "cms_postconn_ex_2"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_2"]) +@pytest.mark.parametrize("port", [9121]) +async def test_post_connect_set_availability_error_swallowed_and_REM_triggers_called( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Inner try around set_availability: generic Exception swallowed, and REM triggers still called.""" + cs: CentralSystem = setup_config_entry + + # Patch server CP methods before connecting. + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + async def ok_set_std(self): + return None + + async def nope_avail(self): + raise ValueError("availability failed") + + called = {"boot": 0, "status": 0} + + async def fake_boot(self): + called["boot"] += 1 + + async def fake_status(self): + called["status"] += 1 + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + monkeypatch.setattr( + ServerCP, "set_standard_configuration", ok_set_std, raising=True + ) + monkeypatch.setattr(ServerCP, "set_availability", nope_avail, raising=True) + monkeypatch.setattr(ServerCP, "trigger_boot_notification", fake_boot, raising=True) + monkeypatch.setattr( + ServerCP, "trigger_status_notification", fake_status, raising=True + ) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + # wait until server registered CP + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + + # enable REM and force boot path + srv_cp._attr_supported_features = {prof.REM} + srv_cp.received_boot_notification = False + setattr(srv_cp, "post_connect_success", False) + + await srv_cp.post_connect() + + assert getattr(srv_cp, "post_connect_success", False) is True + assert called["boot"] == 1 + assert called["status"] == 1 + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9122, "cp_id": "CP_postconn_ex_3", "cms": "cms_postconn_ex_3"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_3"]) +@pytest.mark.parametrize("port", [9122]) +async def test_post_connect_set_availability_cancelled_bubbles( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Inner try around set_availability: asyncio.CancelledError must be re-raised (not swallowed).""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + async def ok_set_std(self): + return None + + async def cancelled(self): + raise asyncio.CancelledError() + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + monkeypatch.setattr( + ServerCP, "set_standard_configuration", ok_set_std, raising=True + ) + monkeypatch.setattr(ServerCP, "set_availability", cancelled, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + srv_cp._attr_supported_features = {prof.REM} + srv_cp.received_boot_notification = False + + with pytest.raises(asyncio.CancelledError): + await srv_cp.post_connect() + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9123, "cp_id": "CP_postconn_ex_4", "cms": "cms_postconn_ex_4"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_4"]) +@pytest.mark.parametrize("port", [9123]) +async def test_post_connect_trigger_boot_notification_raises_outer_caught( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Outer try: trigger_boot_notification raises -> swallowed; post_connect_success already True.""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + async def ok_set_std(self): + return None + + async def ok_avail(self): + return None + + async def boom_boot(self): + raise RuntimeError("boot fail") + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + monkeypatch.setattr( + ServerCP, "set_standard_configuration", ok_set_std, raising=True + ) + monkeypatch.setattr(ServerCP, "set_availability", ok_avail, raising=True) + monkeypatch.setattr(ServerCP, "trigger_boot_notification", boom_boot, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + srv_cp._attr_supported_features = {prof.REM} + srv_cp.received_boot_notification = False + setattr(srv_cp, "post_connect_success", False) + + await srv_cp.post_connect() + + assert getattr(srv_cp, "post_connect_success", False) is True + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9124, "cp_id": "CP_postconn_ex_5", "cms": "cms_postconn_ex_5"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_5"]) +@pytest.mark.parametrize("port", [9124]) +async def test_post_connect_trigger_status_notification_raises_outer_caught( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Outer try: trigger_status_notification raises -> swallowed; post_connect_success already True.""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + async def ok_set_std(self): + return None + + async def ok_avail(self): + return None + + async def ok_boot(self): + return None + + async def boom_status(self): + raise RuntimeError("status fail") + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + monkeypatch.setattr( + ServerCP, "set_standard_configuration", ok_set_std, raising=True + ) + monkeypatch.setattr(ServerCP, "set_availability", ok_avail, raising=True) + monkeypatch.setattr(ServerCP, "trigger_boot_notification", ok_boot, raising=True) + monkeypatch.setattr( + ServerCP, "trigger_status_notification", boom_status, raising=True + ) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + srv_cp._attr_supported_features = {prof.REM} + srv_cp.received_boot_notification = False + setattr(srv_cp, "post_connect_success", False) + + await srv_cp.post_connect() + assert getattr(srv_cp, "post_connect_success", False) is True + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9125, "cp_id": "CP_postconn_ex_6", "cms": "cms_postconn_ex_6"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_6"]) +@pytest.mark.parametrize("port", [9125]) +async def test_post_connect_update_entry_raises_outer_caught( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Outer try: async_update_entry raises -> swallowed, success flag not set.""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + + def boom_update_entry(entry, data=None): + raise RuntimeError("update failed") + + monkeypatch.setattr( + srv_cp.hass.config_entries, + "async_update_entry", + boom_update_entry, + raising=True, + ) + + await srv_cp.post_connect() + assert getattr(srv_cp, "post_connect_success", False) is not True + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9126, "cp_id": "CP_postconn_ex_7", "cms": "cms_postconn_ex_7"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_7"]) +@pytest.mark.parametrize("port", [9126]) +async def test_post_connect_set_standard_configuration_raises_outer_caught( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Outer try: set_standard_configuration raises -> swallowed, success flag not set.""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def ok_get_n(self): + return 1 + + async def ok_hb(self): + return 300 + + async def ok_meas(self): + return "Voltage" + + async def boom_std(self): + raise RuntimeError("std cfg fail") + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", ok_get_n, raising=True) + monkeypatch.setattr(ServerCP, "get_heartbeat_interval", ok_hb, raising=True) + monkeypatch.setattr(ServerCP, "get_supported_measurands", ok_meas, raising=True) + monkeypatch.setattr(ServerCP, "set_standard_configuration", boom_std, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + + await srv_cp.post_connect() + assert getattr(srv_cp, "post_connect_success", False) is not True + finally: + task.cancel() + + +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9127, "cp_id": "CP_postconn_ex_8", "cms": "cms_postconn_ex_8"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_postconn_ex_8"]) +@pytest.mark.parametrize("port", [9127]) +async def test_post_connect_number_of_connectors_raises_outer_caught( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Outer try: get_number_of_connectors raises -> swallowed, success flag not set.""" + cs: CentralSystem = setup_config_entry + + from custom_components.ocpp.ocppv16 import ChargePoint as ServerCP + + async def ok_fetch(self): + return None + + async def boom_n(self): + raise RuntimeError("n fail") + + monkeypatch.setattr(ServerCP, "fetch_supported_features", ok_fetch, raising=True) + monkeypatch.setattr(ServerCP, "get_number_of_connectors", boom_n, raising=True) + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + from tests.test_charge_point_v16 import ChargePoint + + cp = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(cp.start()) + try: + for _ in range(100): + if cp_id in cs.charge_points: + break + await asyncio.sleep(0.02) + srv_cp = cs.charge_points[cp_id] + + await srv_cp.post_connect() + assert getattr(srv_cp, "post_connect_success", False) is not True + finally: + task.cancel() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9340, "cp_id": "CP_cov_abb_tx_reset", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_abb_tx_reset"]) +@pytest.mark.parametrize("port", [9340]) +async def test_abb_new_tx_resets_eair_and_meter_start( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """ABB: when a new transactionId appears, per-connector EAIR and meter_start are cleared so a lower EAIR (e.g. 0 Wh) is accepted.""" + cs = setup_config_entry + + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + cpid = srv.settings.cpid + + # --- Seed previous session on connector 1 with EAIR = 15000 Wh (15.0 kWh), txId=111 --- + mv_tx1 = call.MeterValues( + connector_id=1, + transaction_id=111, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "15000", # Wh + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Periodic", + } + ], + } + ], + ) + resp = await client.call(mv_tx1) + assert resp is not None + + # EAIR should be normalized to kWh on connector 1. + assert ( + cs.get_unit(cpid, "Energy.Active.Import.Register", connector_id=1) + == "kWh" + ) + assert ( + pytest.approx( + cs.get_metric( + cpid, "Energy.Active.Import.Register", connector_id=1 + ), + rel=1e-6, + ) + == 15.0 + ) + + # --- Simulate ABB behavior: new tx starts and EAIR restarts at 0 Wh with txId=222 --- + mv_tx2_begin = call.MeterValues( + connector_id=1, + transaction_id=222, # new transaction id triggers reset block + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "0", # Wh -> should be accepted after reset + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Periodic", + } + ], + } + ], + ) + resp2 = await client.call(mv_tx2_begin) + assert resp2 is not None + + # Verify: transaction id updated to 222, EAIR accepted as 0.0 kWh, and meter_start cleared. + assert int(cs.get_metric(cpid, "Transaction.Id", connector_id=1)) == 222 + assert ( + cs.get_unit(cpid, "Energy.Active.Import.Register", connector_id=1) + == "kWh" + ) + assert ( + pytest.approx( + cs.get_metric(cpid, "Energy.Active.Import.Register", connector_id=1) + or 0.0, + rel=1e-6, + ) + == 0.0 + ) + + # Meter.Start should be cleared (None) at new tx begin per integration logic. + assert cs.get_metric(cpid, "Energy.Meter.Start", connector_id=1) in ( + None, + 0, + 0.0, + ) + + # --- Follow-up periodic sample to ensure increasing values are tracked from the new baseline --- + mv_tx2_next = call.MeterValues( + connector_id=1, + transaction_id=222, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "100", # Wh + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Periodic", + } + ], + } + ], + ) + resp3 = await client.call(mv_tx2_next) + assert resp3 is not None + + # EAIR should now be 0.1 kWh on connector 1 from the new baseline. + assert ( + pytest.approx( + cs.get_metric( + cpid, "Energy.Active.Import.Register", connector_id=1 + ), + rel=1e-6, + ) + == 0.1 + ) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9341, "cp_id": "CP_cov_ctx_priority", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_ctx_priority"]) +@pytest.mark.parametrize("port", [9341]) +async def test_eair_context_priority_in_bucket( + hass, socket_enabled, cp_id, port, setup_config_entry +): + """Ensure EAIR context priority per bucket: Transaction.End > Sample.Periodic > Sample.Clock.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + cpid = srv.settings.cpid + + # Bucket 1: include three EAIR candidates with different contexts. + # Expect: Transaction.End (13000 Wh) wins -> 13.0 kWh. + mv_bucket1 = call.MeterValues( + connector_id=1, + transaction_id=555, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "11000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Clock", + }, + { + "value": "12000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Periodic", + }, + { + "value": "13000", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Transaction.End", + }, + # Some unrelated measurand in the same bucket + { + "value": "230", + "measurand": "Voltage", + "unit": "V", + "location": "Outlet", + "context": "Sample.Periodic", + }, + ], + } + ], + ) + resp1 = await client.call(mv_bucket1) + assert resp1 is not None + + assert ( + cs.get_unit(cpid, "Energy.Active.Import.Register", connector_id=1) + == "kWh" + ) + assert cs.get_metric( + cpid, "Energy.Active.Import.Register", connector_id=1 + ) == pytest.approx(13.0, rel=1e-6) + + # Bucket 2: No Transaction.End; Sample.Periodic should beat Sample.Clock. + # Expect: 13100 Wh -> 13.1 kWh. + mv_bucket2 = call.MeterValues( + connector_id=1, + transaction_id=555, + meter_value=[ + { + "timestamp": datetime.now(tz=UTC).isoformat(), + "sampledValue": [ + { + "value": "13090", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Clock", + }, + { + "value": "13100", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh", + "location": "Outlet", + "context": "Sample.Periodic", + }, + ], + } + ], + ) + resp2 = await client.call(mv_bucket2) + assert resp2 is not None + + assert ( + cs.get_unit(cpid, "Energy.Active.Import.Register", connector_id=1) + == "kWh" + ) + assert cs.get_metric( + cpid, "Energy.Active.Import.Register", connector_id=1 + ) == pytest.approx(13.1, rel=1e-6) + + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + class ChargePoint(cpclass): """Representation of real client Charge Point.""" diff --git a/tests/test_charge_point_v201.py b/tests/test_charge_point_v201.py index 5b37cb69..34e90ab8 100644 --- a/tests/test_charge_point_v201.py +++ b/tests/test_charge_point_v201.py @@ -606,8 +606,8 @@ async def _test_transaction(hass: HomeAssistant, cs: CentralSystem, cp: ChargePo == ChargePointStatusv16.charging ) assert cs.get_metric(cpid, Measurand.current_export.value) == 0 - assert cs.get_metric(cpid, Measurand.current_import.value) == 6.6 - assert cs.get_metric(cpid, Measurand.current_offered.value) == 36.6 + assert cs.get_metric(cpid, Measurand.current_import.value) == pytest.approx(2.2) + assert cs.get_metric(cpid, Measurand.current_offered.value) == pytest.approx(12.2) assert cs.get_metric(cpid, Measurand.energy_active_export_register.value) == 0 assert cs.get_metric(cpid, Measurand.energy_active_import_register.value) == 0.1 assert cs.get_metric(cpid, Measurand.energy_reactive_export_register.value) == 0 diff --git a/tests/test_more_coverage_chargepoint.py b/tests/test_more_coverage_chargepoint.py new file mode 100644 index 00000000..e12ca832 --- /dev/null +++ b/tests/test_more_coverage_chargepoint.py @@ -0,0 +1,416 @@ +"""Test additional chargepoint paths.""" + +import asyncio +import contextlib +from types import SimpleNamespace + +import pytest +import websockets +from websockets.protocol import State + +from custom_components.ocpp.chargepoint import ChargePoint as BaseCP, MeasurandValue + + +# Reuse the client helpers & fixtures from your main v16 test module. +from .test_charge_point_v16 import wait_ready, ChargePoint + + +@pytest.mark.timeout(10) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9410, "cp_id": "CP_cov_base_defaults", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_base_defaults"]) +@pytest.mark.parametrize("port", [9410]) +async def test_base_default_methods_return_values( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Covers 299, 307, 315, 413, 417, 425, 429, 433, 451–453, 455–457: base defaults & no-op behaviors.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Use the base-class implementations explicitly to cover the base lines, + # even though v16 overrides some of these. + assert ( + await BaseCP.get_number_of_connectors(srv) == srv.num_connectors + ) # L299 + assert await BaseCP.get_supported_measurands(srv) == "" # L307 + assert ( + await BaseCP.get_supported_features(srv) == 0 + ) # L315 (prof.NONE is 0) + + assert await BaseCP.set_availability(srv, True) is False # L413 + assert await BaseCP.start_transaction(srv, 1) is False # L417 + assert await BaseCP.stop_transaction(srv) is False # L425 + assert await BaseCP.reset(srv) is False # L429 + assert await BaseCP.unlock(srv, 1) is False # L433 + + assert await BaseCP.get_configuration(srv, "Foo") is None # L451–453 + assert await BaseCP.configure(srv, "Foo", "Bar") is None # L455–457 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9413, "cp_id": "CP_cov_handle_call", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_handle_call"]) +@pytest.mark.parametrize("port", [9413]) +async def test_handle_call_notimplemented_sends_call_error( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Covers 520–526: wrapper catches ocpp.exceptions.NotImplementedError and sends CallError JSON.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Patch the exact base alias used by the subclass, and raise the *OCPP* NotImplementedError. + import custom_components.ocpp.chargepoint as cp_mod + from ocpp.exceptions import NotImplementedError as OcppNotImplementedError + + async def boom(self, msg): + # Raise the OCPP exception class that the wrapper actually catches. + raise OcppNotImplementedError(details={"cause": "nyi"}) + + captured = {"payload": None} + + async def fake_send(self, payload): + # _handle_call builds a JSON string via to_json(), then calls _send(...) + captured["payload"] = ( + payload.to_json() if hasattr(payload, "to_json") else payload + ) + + # Patch on the cp alias (the base class your subclass imports as `cp`). + monkeypatch.setattr(cp_mod.cp, "_handle_call", boom, raising=True) + monkeypatch.setattr(cp_mod.cp, "_send", fake_send, raising=True) + + class Msg: + """Minimal message stub compatible with msg.create_call_error(e).""" + + def create_call_error(self, *_, **__): + from types import SimpleNamespace + + # Return an object with to_json() so wrapper turns it into a JSON string. + return SimpleNamespace(to_json=lambda: '{"error":"NotImplemented"}') + + # Invoke: the wrapper should catch the OCPP NotImplementedError and call _send with JSON. + await srv._handle_call(Msg()) + + assert captured["payload"] == '{"error":"NotImplemented"}' + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9414, "cp_id": "CP_cov_run_paths", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_run_paths"]) +@pytest.mark.parametrize("port", [9414]) +async def test_run_handles_timeout_and_other_exception( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Covers 537 and 540–541: run() swallows TimeoutError and logs other exceptions, then stops.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + stopped = {"count": 0} + + async def fake_stop(): + stopped["count"] += 1 + + monkeypatch.setattr(srv, "stop", fake_stop, raising=True) + + async def raises_timeout(): + await asyncio.sleep(0) + raise TimeoutError("simulated") + + async def raises_other(): + await asyncio.sleep(0) + raise ValueError("simulated") + + # TimeoutError path -> should be swallowed (L537) and then stop() called. + await srv.run([raises_timeout()]) + assert stopped["count"] >= 1 + + # Other exception path -> should be logged via L540–541 and then stop() called again. + await srv.run([raises_other()]) + assert stopped["count"] >= 2 + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9415, "cp_id": "CP_cov_update_early", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_update_early"]) +@pytest.mark.parametrize("port", [9415]) +async def test_update_returns_early_when_root_device_missing( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Covers 602: update() returns early if the root device cannot be found in the device registry.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Fake registries: no device returned. + import custom_components.ocpp.chargepoint as mod + + class FakeDR: + """Fake DR.""" + + def async_get_device(self, identifiers): + return None + + def async_clear_config_entry(self, config_entry_id): + return None + + @property + def devices(self): + """Fake devices.""" + return {} + + def async_update_device(self, *args, **kwargs): + return None + + def async_get_or_create(self, *args, **kwargs): + return SimpleNamespace(id="dummy") + + class FakeER: + """Fake ER.""" + + def async_clear_config_entry(self, config_entry_id): + return None + + def fake_entries_for_device(_er, _dev_id): + # No entities to update; the loop is exercised anyway. + return [] + + # Patch HA helpers & dispatcher. + monkeypatch.setattr( + mod.device_registry, "async_get", lambda _: FakeDR(), raising=True + ) + monkeypatch.setattr( + mod.entity_registry, "async_get", lambda _: FakeER(), raising=True + ) + + monkeypatch.setattr( + mod.entity_registry, + "async_entries_for_device", + fake_entries_for_device, + raising=True, + ) + monkeypatch.setattr( + mod, "async_dispatcher_send", lambda *args, **kw: None, raising=True + ) + + # Should exit early without error (L602). + await srv.update(srv.settings.cpid) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + "setup_config_entry", + [{"port": 9416, "cp_id": "CP_cov_update_walk", "cms": "cms_services"}], + indirect=True, +) +@pytest.mark.parametrize("cp_id", ["CP_cov_update_walk"]) +@pytest.mark.parametrize("port", [9416]) +async def test_update_traverses_children_and_skips_visited( + hass, socket_enabled, cp_id, port, setup_config_entry, monkeypatch +): + """Covers 612 and 623–624: skips already visited IDs and appends children discovered via via_device_id.""" + cs = setup_config_entry + async with websockets.connect( + f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"] + ) as ws: + client = ChargePoint(f"{cp_id}_client", ws) + task = asyncio.create_task(client.start()) + try: + await client.send_boot_notification() + await wait_ready(cs.charge_points[cp_id]) + srv = cs.charge_points[cp_id] + + # Build a tiny fake device graph: + # root -> child (twice in the values() list to create a duplicate push) + import custom_components.ocpp.chargepoint as mod + + class Dev: + """Fake Dev.""" + + def __init__(self, id, via=None): + self.id = id + self.via_device_id = via + + root = Dev("root", via=None) + child = Dev("child", via="root") + + class FakeDR: + """Fake DR.""" + + def async_clear_config_entry(self, config_entry_id): + return None + + def async_update_device(self, *args, **kwargs): + return None + + def async_get_or_create(self, *args, **kwargs): + return SimpleNamespace(id="dummy") + + def async_get_device(self, identifiers): + return root + + @property + def devices(self): + # Duplicate the child to force the same ID to be appended twice -> will hit continue (L612) + class Container: + def values(self_inner): + return [root, child, child] + + return Container() + + class FakeER: + """Fake ER.""" + + def async_clear_config_entry(self, config_entry_id): + return None + + def fake_entries_for_device(_er, _dev_id): + # No entities to update; the loop is exercised anyway. + return [] + + # Patch HA helpers & dispatcher. + monkeypatch.setattr( + mod.device_registry, "async_get", lambda _: FakeDR(), raising=True + ) + monkeypatch.setattr( + mod.entity_registry, "async_get", lambda _: FakeER(), raising=True + ) + monkeypatch.setattr( + mod.entity_registry, + "async_entries_for_device", + fake_entries_for_device, + raising=True, + ) + monkeypatch.setattr( + mod, "async_dispatcher_send", lambda *args, **kw: None, raising=True + ) + + # No exceptions expected; internal traversal will append 'child' twice, + # so on second pop it will be in 'visited' and trigger L612 'continue'. + await srv.update(srv.settings.cpid) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await ws.close() + + +@pytest.mark.timeout(5) +async def test_process_measurands_defaults_and_session_energy_v2x(hass, monkeypatch): + """Covers 830–832, 836, 881–887: default EAIR measurand/unit and Energy.Session handling for 2.x.""" + # Minimal CP instance not bound to a real socket. + version = SimpleNamespace(value="2.0.1") + fake_hass = SimpleNamespace( + async_create_task=lambda c: asyncio.create_task(c), + helpers=SimpleNamespace( + entity_component=SimpleNamespace(async_update_entity=lambda eid: None) + ), + ) + fake_entry = SimpleNamespace(entry_id="dummy") + fake_central = SimpleNamespace( + websocket_ping_interval=0, + websocket_ping_timeout=0, + websocket_ping_tries=0, + ) + fake_settings = SimpleNamespace(cpid="cpid_dummy") + fake_conn = SimpleNamespace(state=State.CLOSED) + + srv = BaseCP( + "cp_dummy", + fake_conn, + version, + fake_hass, + fake_entry, + fake_central, + fake_settings, + ) + srv._ocpp_version = "2.0.1" # ensure 2.x path + + # 1) Missing measurand -> defaults to EAIR; missing unit -> defaults to Wh then normalized to kWh. + samples1 = [[MeasurandValue(None, 12345.0, None, None, None, None)]] + srv.process_measurands( + samples1, is_transaction=True, connector_id=1 + ) # <-- no await + + eair = srv._metrics[(1, "Energy.Active.Import.Register")] + assert eair.unit == "kWh" + assert pytest.approx(eair.value, rel=1e-6) == 12.345 + + esess = srv._metrics[(1, "Energy.Session")] + assert esess.unit == "kWh" + assert (esess.value or 0.0) == 0.0 + + # 2) Next periodic EAIR sample increases by 100 Wh -> session delta = 0.1 kWh. + samples2 = [[MeasurandValue(None, 12445.0, None, None, None, None)]] + srv.process_measurands( + samples2, is_transaction=True, connector_id=1 + ) # <-- no await + + eair2 = srv._metrics[(1, "Energy.Active.Import.Register")] + esess2 = srv._metrics[(1, "Energy.Session")] + assert pytest.approx(eair2.value, rel=1e-6) == 12.445 + assert pytest.approx(esess2.value or 0.0, rel=1e-6) == 0.1