Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,19 @@ async def on_connect(self, websocket: ServerConnection):
await charge_point.reconnect(websocket)

def _get_metrics(self, id: str):
"""Return metrics."""
"""Return (cp_id, metrics mapping, cp instance, safe int num_connectors)."""
cp_id = self.cpids.get(id, id)
cp = self.charge_points.get(cp_id)
n_connectors = getattr(cp, "num_connectors", 1) or 1

def _safe_int(value, default=1):
try:
iv = int(value)
return iv if iv > 0 else default
except Exception:
return default

n_connectors = _safe_int(getattr(cp, "num_connectors", 1), default=1)

return (
(cp_id, cp._metrics, cp, n_connectors)
if cp is not None
Expand Down
194 changes: 151 additions & 43 deletions custom_components/ocpp/chargepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,18 @@ async def post_connect(self):

if prof.REM in self._attr_supported_features:
if self.received_boot_notification is False:
await self.trigger_boot_notification()
await self.trigger_status_notification()
try:
await asyncio.wait_for(
self.trigger_boot_notification(), timeout=3
)
except Exception as ex:
_LOGGER.debug("trigger_boot_notification ignored: %s", ex)
try:
await asyncio.wait_for(
self.trigger_status_notification(), timeout=3
)
except Exception as ex:
_LOGGER.debug("trigger_status_notification ignored: %s", ex)

except Exception as e:
_LOGGER.debug("post_connect aborted non-fatally: %s", e)
Expand Down Expand Up @@ -464,48 +474,58 @@ async def monitor_connection(self):
self._metrics[(0, cstat.latency_pong.value)].unit = "ms"
connection = self._connection
timeout_counter = 0

# Add backstop to start post connect for non-compliant chargers
# after 10s to allow for when a boot notification has not been received
await asyncio.sleep(10)
if not self.post_connect_success:
self.hass.async_create_task(self.post_connect())

while connection.state is State.OPEN:
try:
await asyncio.sleep(self.cs_settings.websocket_ping_interval)
time0 = time.perf_counter()
latency_ping = self.cs_settings.websocket_ping_timeout * 1000
latency_pong = self.cs_settings.websocket_ping_timeout * 1000
pong_waiter = await asyncio.wait_for(
Comment on lines +489 to 490
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect latency initialization.

Both latency_ping and latency_pong are initialized to the same timeout value before the ping is sent. This means latency_ping won't reflect the actual ping time if the ping succeeds.

Move the latency_pong initialization after the ping completes:

                time0 = time.perf_counter()
                latency_ping = self.cs_settings.websocket_ping_timeout * 1000
-               latency_pong = self.cs_settings.websocket_ping_timeout * 1000
                pong_waiter = await asyncio.wait_for(
                    connection.ping(), timeout=self.cs_settings.websocket_ping_timeout
                )
                time1 = time.perf_counter()
                latency_ping = round(time1 - time0, 3) * 1000
+               latency_pong = self.cs_settings.websocket_ping_timeout * 1000
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
latency_pong = self.cs_settings.websocket_ping_timeout * 1000
pong_waiter = await asyncio.wait_for(
time0 = time.perf_counter()
latency_ping = self.cs_settings.websocket_ping_timeout * 1000
pong_waiter = await asyncio.wait_for(
connection.ping(), timeout=self.cs_settings.websocket_ping_timeout
)
time1 = time.perf_counter()
latency_ping = round(time1 - time0, 3) * 1000
latency_pong = self.cs_settings.websocket_ping_timeout * 1000
🤖 Prompt for AI Agents
In custom_components/ocpp/chargepoint.py around lines 489-490, latency_ping and
latency_pong are both set to the websocket_ping_timeout before the ping is
actually sent so latency_ping never reflects the real ping round-trip; instead,
record the send timestamp and set latency_ping when the ping is sent/completed
and only initialize latency_pong after the await for the pong finishes (i.e.,
move the latency_pong assignment to after the wait_for returns or raises) so
latency_ping and latency_pong reflect actual measured times and keep the same
timeout usage for the wait_for call.

connection.ping(), timeout=self.cs_settings.websocket_ping_timeout
)
time1 = time.perf_counter()
latency_ping = round(time1 - time0, 3) * 1000
latency_pong = self.cs_settings.websocket_ping_timeout * 1000

await asyncio.wait_for(
pong_waiter, timeout=self.cs_settings.websocket_ping_timeout
)
timeout_counter = 0
time2 = time.perf_counter()
latency_pong = round(time2 - time1, 3) * 1000

_LOGGER.debug(
f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': ping={latency_ping} ms, pong={latency_pong} ms",
f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': "
f"ping={latency_ping} ms, pong={latency_pong} ms",
)
self._metrics[(0, cstat.latency_ping.value)].value = latency_ping
self._metrics[(0, cstat.latency_pong.value)].value = latency_pong

except TimeoutError as timeout_exception:
timeout_counter += 1
_LOGGER.debug(
f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': ping={latency_ping} ms, pong={latency_pong} ms",
f"Connection latency from '{self.cs_settings.csid}' to '{self.id}': "
f"ping={latency_ping} ms, pong={latency_pong} ms",
)
self._metrics[(0, cstat.latency_ping.value)].value = latency_ping
self._metrics[(0, cstat.latency_pong.value)].value = latency_pong
timeout_counter += 1

if timeout_counter > self.cs_settings.websocket_ping_tries:
_LOGGER.debug(
f"Connection to '{self.id}' timed out after '{self.cs_settings.websocket_ping_tries}' ping tries",
)
raise timeout_exception
else:
continue
except Exception as ex:
_LOGGER.debug(f"monitor_connection stopping due to exception: {ex}")
break

async def _handle_call(self, msg):
try:
Expand Down Expand Up @@ -646,8 +666,15 @@ def get_authorization_status(self, id_tag):
)
return auth_status

def process_phases(self, data: list[MeasurandValue], connector_id: int | None = 0):
"""Process phase data from meter values."""
def process_phases(self, data: list[MeasurandValue], connector_id: int = 0):
"""Process per-phase MeterValues and aggregate them into per-connector metrics.

Rules:
- Voltage: average (L1-N/L2-N/L3-N or L-L divided by √3); fall back to averaging L1/L2/L3 if needed.
- Current.*: average of L1/L2/L3 (ignore N).
- Power.Factor: **average** of L1/L2/L3 (ignore N). *Do not sum; unit is dimensionless and may be missing.*
- Other (e.g. Power.Active.*): sum of L1/L2/L3 (ignore N).
"""
# For single-connector chargers, use connector 1.
n_connectors = getattr(self, CONF_NUM_CONNECTORS, DEFAULT_NUM_CONNECTORS) or 1
if connector_id in (None, 0):
Expand All @@ -658,78 +685,129 @@ def process_phases(self, data: list[MeasurandValue], connector_id: int | None =
except Exception:
target_cid = 1 if n_connectors == 1 else 0

def average_of_nonzero(values):
nonzero_values: list = [v for v in values if v != 0.0]
nof_values: int = len(nonzero_values)
average = sum(nonzero_values) / nof_values if nof_values > 0 else 0
return average
def average_of_nonzero(values: list[float]) -> float:
"""Average only non-zero values; return 0.0 if all are zero or list is empty."""
nonzero = [v for v in values if v != 0.0]
return (sum(nonzero) / len(nonzero)) if nonzero else 0.0

measurand_data: dict[str, dict[str, float]] = {}

measurand_data = {}
for item in data:
# create ordered Dict for each measurand, eg {"voltage":{"unit":"V","L1-N":"230"...}}
measurand = item.measurand
phase = item.phase
value = item.value
unit = item.unit
context = item.context
if measurand is not None and phase is not None and unit is not None:
if measurand not in measurand_data:
measurand_data[measurand] = {}

if measurand is None or phase is None:
continue

if measurand not in measurand_data:
measurand_data[measurand] = {}

if unit is not None:
measurand_data[measurand][om.unit.value] = unit
measurand_data[measurand][phase] = value
self._metrics[(target_cid, measurand)].unit = unit
self._metrics[(target_cid, measurand)].extra_attr[om.unit.value] = unit
self._metrics[(target_cid, measurand)].extra_attr[phase] = value

measurand_data[measurand][phase] = value
self._metrics[(target_cid, measurand)].extra_attr[phase] = value
if context is not None:
self._metrics[(target_cid, measurand)].extra_attr[om.context.value] = (
context
)

line_phases = [Phase.l1.value, Phase.l2.value, Phase.l3.value, Phase.n.value]
line_phases_all = [
Phase.l1.value,
Phase.l2.value,
Phase.l3.value,
Phase.n.value,
]
phases_l123 = [Phase.l1.value, Phase.l2.value, Phase.l3.value]
line_to_neutral_phases = [Phase.l1_n.value, Phase.l2_n.value, Phase.l3_n.value]
line_to_line_phases = [Phase.l1_l2.value, Phase.l2_l3.value, Phase.l3_l1.value]

def _avg_l123(phase_info: dict) -> float:
return average_of_nonzero(
[phase_info.get(phase, 0.0) for phase in phases_l123]
)

def _sum_l123(phase_info: dict) -> float:
return sum(phase_info.get(phase, 0.0) for phase in phases_l123)

for metric, phase_info in measurand_data.items():
metric_value = None
metric_value: float | None = None
mname = str(metric)

if metric in [Measurand.voltage.value]:
if not phase_info.keys().isdisjoint(line_to_neutral_phases):
# Line to neutral voltages are averaged
metric_value = average_of_nonzero(
[phase_info.get(phase, 0) for phase in line_to_neutral_phases]
[phase_info.get(phase, 0.0) for phase in line_to_neutral_phases]
)
elif not phase_info.keys().isdisjoint(line_to_line_phases):
# Line to line voltages are averaged and converted to line to neutral
metric_value = average_of_nonzero(
[phase_info.get(phase, 0) for phase in line_to_line_phases]
[phase_info.get(phase, 0.0) for phase in line_to_line_phases]
) / sqrt(3)
elif not phase_info.keys().isdisjoint(line_phases):
elif not phase_info.keys().isdisjoint(line_phases_all):
# Workaround for chargers that don't follow engineering convention
# Assumes voltages are line to neutral
metric_value = average_of_nonzero(
[phase_info.get(phase, 0) for phase in line_phases]
)
metric_value = _avg_l123(phase_info)

else:
if not phase_info.keys().isdisjoint(line_phases):
metric_value = sum(
phase_info.get(phase, 0) for phase in line_phases
)
elif not phase_info.keys().isdisjoint(line_to_neutral_phases):
# Workaround for some chargers that erroneously use line to neutral for current
metric_value = sum(
phase_info.get(phase, 0) for phase in line_to_neutral_phases
)
is_current = mname.lower().startswith("current")
if is_current:
# Current.* shown per phase -> avg of L1/L2/L3, ignore N
if not phase_info.keys().isdisjoint(phases_l123):
metric_value = _avg_l123(phase_info)
elif not phase_info.keys().isdisjoint(line_to_neutral_phases):
# Workaround for some chargers that erroneously use line to neutral for current
metric_value = average_of_nonzero(
[
phase_info.get(phase, 0.0)
for phase in line_to_neutral_phases
]
)

# Special-case: Power.Factor must be averaged, never summed
elif metric == Measurand.power_factor.value:
if not phase_info.keys().isdisjoint(phases_l123):
metric_value = _avg_l123(phase_info)
elif not phase_info.keys().isdisjoint(line_to_neutral_phases):
metric_value = average_of_nonzero(
[phase_info.get(p, 0.0) for p in line_to_neutral_phases]
)
# If only a single phase value exists, just pass it through
else:
metric_value = next(
(v for k, v in phase_info.items() if k != om.unit.value),
None,
)

else:
# Other (e.g. Power.*): total is sum over phases
if not phase_info.keys().isdisjoint(phases_l123):
metric_value = _sum_l123(phase_info)
elif not phase_info.keys().isdisjoint(line_to_neutral_phases):
metric_value = sum(
phase_info.get(phase, 0.0)
for phase in line_to_neutral_phases
)

if metric_value is not None:
metric_unit = phase_info.get(om.unit.value)
m = self._metrics[(target_cid, metric)]

if metric_unit == DEFAULT_POWER_UNIT:
m.value = metric_value / 1000
m.unit = HA_POWER_UNIT
self._metrics[(target_cid, metric)].value = metric_value / 1000
self._metrics[(target_cid, metric)].unit = HA_POWER_UNIT
elif metric_unit == DEFAULT_ENERGY_UNIT:
m.value = metric_value / 1000
m.unit = HA_ENERGY_UNIT
self._metrics[(target_cid, metric)].value = metric_value / 1000
self._metrics[(target_cid, metric)].unit = HA_ENERGY_UNIT
else:
m.value = metric_value
m.unit = metric_unit
self._metrics[(target_cid, metric)].value = metric_value
self._metrics[(target_cid, metric)].unit = metric_unit

@staticmethod
def get_energy_kwh(measurand_value: MeasurandValue) -> float:
Expand Down Expand Up @@ -803,6 +881,36 @@ def process_measurands(
and is_transaction
and self._ocpp_version != "1.6"
):
# Ensure session metric is present and well-formed
sess_key = (connector_id, "Energy.Session")
if sess_key not in self._metrics:
self._metrics[sess_key] = Metric(0.0, HA_ENERGY_UNIT)
else:
if self._metrics[sess_key].unit is None:
self._metrics[sess_key].unit = HA_ENERGY_UNIT
if self._metrics[sess_key].value is None:
self._metrics[sess_key].value = 0.0

# Bootstrap baseline for 2.x if missing:
ms_key = (
connector_id,
csess.meter_start.value,
) # "Energy.Meter.Start"
if ms_key not in self._metrics:
# Create the slot with kWh unit to match normalized EAIR above
self._metrics[ms_key] = Metric(None, HA_ENERGY_UNIT)

ms_metric = self._metrics[ms_key]
if ms_metric.value is None:
# First EAIR in this transaction: set baseline to current EAIR (kWh)
ms_metric.value = value
# Keep session at 0.0 for the baseline sample
else:
# Compute positive delta only (guard against counter resets)
delta = value - ms_metric.value
if delta >= 0:
self._metrics[sess_key].value = round(delta, 6)

if (
self._charger_reports_session_energy
and context != ReadingContext.transaction_begin.value
Expand Down
Loading
Loading