Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

# HA Development
/config/
.DS_Store
277 changes: 238 additions & 39 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from __future__ import annotations

import contextlib
import json
import logging
import re
import ssl

from functools import partial
Expand All @@ -28,6 +30,7 @@
)
from .enums import (
HAChargerServices as csvcs,
HAChargerStatuses as cstat,
)
from .chargepoint import SetVariableResult

Expand Down Expand Up @@ -88,6 +91,10 @@
)


def _norm(s: str) -> str:
return re.sub(r"[^a-z0-9]", "", str(s).lower())


class CentralSystem:
"""Server for handling OCPP connections."""

Expand Down Expand Up @@ -188,6 +195,15 @@ async def create(hass: HomeAssistant, entry: ConfigEntry):
self._server = server
return self

@staticmethod
def _norm_conn(connector_id: int | None) -> int:
if connector_id is None:
return 0
try:
return int(connector_id)
except Exception:
return 0

def select_subprotocol(
self, connection: ServerConnection, subprotocols
) -> Subprotocol | None:
Expand Down Expand Up @@ -273,59 +289,228 @@ async def on_connect(self, websocket: ServerConnection):
charge_point = self.charge_points[cp_id]
await charge_point.reconnect(websocket)

def get_metric(self, id: str, measurand: str):
"""Return last known value for given measurand."""
# allow id to be either cpid or cp_id
def _get_metrics(self, id: str):
"""Return metrics."""
cp_id = self.cpids.get(id, id)
cp = self.charge_points.get(cp_id)
n_connectors = getattr(cp, "num_connectors", 1) or 1
return (
(cp_id, cp._metrics, cp, n_connectors)
if cp is not None
else (None, None, None, None)
)

def get_metric(self, id: str, measurand: str, connector_id: int | None = None):
"""Return last known value for given measurand."""
cp_id, m, cp, n_connectors = self._get_metrics(id)
if cp is None:
return None

def _try_val(key):
with contextlib.suppress(Exception):
val = m[key].value
return val
return None

# 1) Explicit connector_id (including 0): just get it
if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_val((conn, measurand))

# 2) No connector_id: try CHARGER level (conn=0)
val = _try_val((0, measurand))
if val is not None:
return val

# 3) Legacy "flat" key (before the connector support)
with contextlib.suppress(Exception):
val = m[measurand].value
if val is not None:
return val

# 4) Fallback to connector 1 (old tests often expect this)
if n_connectors >= 1:
val = _try_val((1, measurand))
if val is not None:
return val

# 5) Last resort: find the first connector 2..N with value
for c in range(2, int(n_connectors) + 1):
val = _try_val((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].value
return None

def del_metric(self, id: str, measurand: str):
def del_metric(self, id: str, measurand: str, connector_id: int | None = None):
"""Set given measurand to None."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if m is None:
return None

if self.cpids.get(cp_id) in self.charge_points:
self.charge_points[cp_id]._metrics[measurand].value = None
conn = self._norm_conn(connector_id)
try:
m[(conn, measurand)].value = None
except Exception:
if conn == 0:
with contextlib.suppress(Exception):
m[measurand].value = None
return None

def get_unit(self, id: str, measurand: str):
def get_unit(self, id: str, measurand: str, connector_id: int | None = None):
"""Return unit of given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_unit(key):
with contextlib.suppress(Exception):
return m[key].unit
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_unit((conn, measurand))

val = _try_unit((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].unit
if val is not None:
return val

if n_connectors >= 1:
val = _try_unit((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_unit((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].unit
return None

def get_ha_unit(self, id: str, measurand: str):
def get_ha_unit(self, id: str, measurand: str, connector_id: int | None = None):
"""Return home assistant unit of given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_ha_unit(key):
with contextlib.suppress(Exception):
return m[key].ha_unit
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_ha_unit((conn, measurand))

val = _try_ha_unit((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].ha_unit
if val is not None:
return val

if n_connectors >= 1:
val = _try_ha_unit((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_ha_unit((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].ha_unit
return None

def get_extra_attr(self, id: str, measurand: str):
"""Return last known extra attributes for given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
def get_extra_attr(self, id: str, measurand: str, connector_id: int | None = None):
"""Return extra attributes for given measurand."""
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_extra(key):
with contextlib.suppress(Exception):
return m[key].extra_attr
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_extra((conn, measurand))

val = _try_extra((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].extra_attr
if val is not None:
return val

if n_connectors >= 1:
val = _try_extra((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_extra((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].extra_attr
return None

def get_available(self, id: str):
"""Return whether the charger is available."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
def get_available(self, id: str, connector_id: int | None = None):
"""Return whether the charger (or a specific connector) is available."""
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp_id in self.charge_points:
return self.charge_points[cp_id].status == STATE_OK
return False
if cp is None:
return None

if self._norm_conn(connector_id) == 0:
return cp.status == STATE_OK

status_val = None
with contextlib.suppress(Exception):
status_val = m[
(self._norm_conn(connector_id), cstat.status_connector.value)
].value

if not status_val:
try:
flat = m[cstat.status_connector.value]
if hasattr(flat, "extra_attr"):
status_val = flat.extra_attr.get(
self._norm_conn(connector_id)
) or getattr(flat, "value", None)
except Exception:
pass

if not status_val:
return cp.status == STATE_OK

ok_statuses_norm = {
"available",
"preparing",
"charging",
"suspendedev",
"suspendedevse",
"finishing",
"occupied",
"reserved",
}

ret = _norm(status_val) in ok_statuses_norm
return ret

def get_supported_features(self, id: str):
"""Return what profiles the charger supports."""
Expand All @@ -336,32 +521,46 @@ def get_supported_features(self, id: str):
return self.charge_points[cp_id].supported_features
return 0

async def set_max_charge_rate_amps(self, id: str, value: float):
async def set_max_charge_rate_amps(
self, id: str, value: float, connector_id: int = 0
):
"""Set the maximum charge rate in amps."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)

if cp_id in self.charge_points:
return await self.charge_points[cp_id].set_charge_rate(limit_amps=value)
return await self.charge_points[cp_id].set_charge_rate(
limit_amps=value, conn_id=connector_id
)
return False

async def set_charger_state(self, id: str, service_name: str, state: bool = True):
async def set_charger_state(
self,
id: str,
service_name: str,
state: bool = True,
connector_id: int | None = 1,
):
"""Carry out requested service/state change on connected charger."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)

resp = False
if cp_id in self.charge_points:
if service_name == csvcs.service_availability.name:
resp = await self.charge_points[cp_id].set_availability(state)
resp = await self.charge_points[cp_id].set_availability(
state, connector_id=connector_id
)
if service_name == csvcs.service_charge_start.name:
resp = await self.charge_points[cp_id].start_transaction()
resp = await self.charge_points[cp_id].start_transaction(
connector_id=connector_id
)
if service_name == csvcs.service_charge_stop.name:
resp = await self.charge_points[cp_id].stop_transaction()
if service_name == csvcs.service_reset.name:
resp = await self.charge_points[cp_id].reset()
if service_name == csvcs.service_unlock.name:
resp = await self.charge_points[cp_id].unlock()
resp = await self.charge_points[cp_id].unlock(connector_id=connector_id)
return resp

def device_info(self):
Expand Down
Loading
Loading