Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

# HA Development
/config/
.DS_Store
277 changes: 238 additions & 39 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from __future__ import annotations

import contextlib
import json
import logging
import re
import ssl

from functools import partial
Expand All @@ -28,6 +30,7 @@
)
from .enums import (
HAChargerServices as csvcs,
HAChargerStatuses as cstat,
)
from .chargepoint import SetVariableResult

Expand Down Expand Up @@ -88,6 +91,10 @@
)


def _norm(s: str) -> str:
return re.sub(r"[^a-z0-9]", "", str(s).lower())


class CentralSystem:
"""Server for handling OCPP connections."""

Expand Down Expand Up @@ -188,6 +195,15 @@ async def create(hass: HomeAssistant, entry: ConfigEntry):
self._server = server
return self

@staticmethod
def _norm_conn(connector_id: int | None) -> int:
if connector_id is None:
return 0
try:
return int(connector_id)
except Exception:
return 0

def select_subprotocol(
self, connection: ServerConnection, subprotocols
) -> Subprotocol | None:
Expand Down Expand Up @@ -273,59 +289,228 @@ async def on_connect(self, websocket: ServerConnection):
charge_point = self.charge_points[cp_id]
await charge_point.reconnect(websocket)

def get_metric(self, id: str, measurand: str):
"""Return last known value for given measurand."""
# allow id to be either cpid or cp_id
def _get_metrics(self, id: str):
"""Return metrics."""
cp_id = self.cpids.get(id, id)
cp = self.charge_points.get(cp_id)
n_connectors = getattr(cp, "num_connectors", 1) or 1
return (
(cp_id, cp._metrics, cp, n_connectors)
if cp is not None
else (None, None, None, None)
)

def get_metric(self, id: str, measurand: str, connector_id: int | None = None):
"""Return last known value for given measurand."""
cp_id, m, cp, n_connectors = self._get_metrics(id)
if cp is None:
return None

def _try_val(key):
with contextlib.suppress(Exception):
val = m[key].value
return val
return None

# 1) Explicit connector_id (including 0): just get it
if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_val((conn, measurand))

# 2) No connector_id: try CHARGER level (conn=0)
val = _try_val((0, measurand))
if val is not None:
return val

# 3) Legacy "flat" key (before the connector support)
with contextlib.suppress(Exception):
val = m[measurand].value
if val is not None:
return val

# 4) Fallback to connector 1 (old tests often expect this)
if n_connectors >= 1:
val = _try_val((1, measurand))
if val is not None:
return val

# 5) Last resort: find the first connector 2..N with value
for c in range(2, int(n_connectors) + 1):
val = _try_val((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].value
return None

def del_metric(self, id: str, measurand: str):
def del_metric(self, id: str, measurand: str, connector_id: int | None = None):
"""Set given measurand to None."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if m is None:
return None

if self.cpids.get(cp_id) in self.charge_points:
self.charge_points[cp_id]._metrics[measurand].value = None
conn = self._norm_conn(connector_id)
try:
m[(conn, measurand)].value = None
except Exception:
if conn == 0:
with contextlib.suppress(Exception):
m[measurand].value = None
return None

def get_unit(self, id: str, measurand: str):
def get_unit(self, id: str, measurand: str, connector_id: int | None = None):
"""Return unit of given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_unit(key):
with contextlib.suppress(Exception):
return m[key].unit
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_unit((conn, measurand))

val = _try_unit((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].unit
if val is not None:
return val

if n_connectors >= 1:
val = _try_unit((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_unit((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].unit
return None

def get_ha_unit(self, id: str, measurand: str):
def get_ha_unit(self, id: str, measurand: str, connector_id: int | None = None):
"""Return home assistant unit of given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_ha_unit(key):
with contextlib.suppress(Exception):
return m[key].ha_unit
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_ha_unit((conn, measurand))

val = _try_ha_unit((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].ha_unit
if val is not None:
return val

if n_connectors >= 1:
val = _try_ha_unit((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_ha_unit((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].ha_unit
return None

def get_extra_attr(self, id: str, measurand: str):
"""Return last known extra attributes for given measurand."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
def get_extra_attr(self, id: str, measurand: str, connector_id: int | None = None):
"""Return extra attributes for given measurand."""
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp is None:
return None

def _try_extra(key):
with contextlib.suppress(Exception):
return m[key].extra_attr
return None

if connector_id is not None:
conn = self._norm_conn(connector_id)
return _try_extra((conn, measurand))

val = _try_extra((0, measurand))
if val is not None:
return val

with contextlib.suppress(Exception):
val = m[measurand].extra_attr
if val is not None:
return val

if n_connectors >= 1:
val = _try_extra((1, measurand))
if val is not None:
return val

for c in range(2, int(n_connectors) + 1):
val = _try_extra((c, measurand))
if val is not None:
return val

if cp_id in self.charge_points:
return self.charge_points[cp_id]._metrics[measurand].extra_attr
return None

def get_available(self, id: str):
"""Return whether the charger is available."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)
def get_available(self, id: str, connector_id: int | None = None):
"""Return whether the charger (or a specific connector) is available."""
cp_id, m, cp, n_connectors = self._get_metrics(id)

if cp_id in self.charge_points:
return self.charge_points[cp_id].status == STATE_OK
return False
if cp is None:
return None

if self._norm_conn(connector_id) == 0:
return cp.status == STATE_OK

status_val = None
with contextlib.suppress(Exception):
status_val = m[
(self._norm_conn(connector_id), cstat.status_connector.value)
].value

if not status_val:
try:
flat = m[cstat.status_connector.value]
if hasattr(flat, "extra_attr"):
status_val = flat.extra_attr.get(
self._norm_conn(connector_id)
) or getattr(flat, "value", None)
except Exception:
pass

if not status_val:
return cp.status == STATE_OK

ok_statuses_norm = {
"available",
"preparing",
"charging",
"suspendedev",
"suspendedevse",
"finishing",
"occupied",
"reserved",
}

ret = _norm(status_val) in ok_statuses_norm
return ret

def get_supported_features(self, id: str):
"""Return what profiles the charger supports."""
Expand All @@ -336,32 +521,46 @@ def get_supported_features(self, id: str):
return self.charge_points[cp_id].supported_features
return 0

async def set_max_charge_rate_amps(self, id: str, value: float):
async def set_max_charge_rate_amps(
self, id: str, value: float, connector_id: int = 0
):
"""Set the maximum charge rate in amps."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)

if cp_id in self.charge_points:
return await self.charge_points[cp_id].set_charge_rate(limit_amps=value)
return await self.charge_points[cp_id].set_charge_rate(
limit_amps=value, conn_id=connector_id
)
return False

async def set_charger_state(self, id: str, service_name: str, state: bool = True):
async def set_charger_state(
self,
id: str,
service_name: str,
state: bool = True,
connector_id: int | None = 1,
):
"""Carry out requested service/state change on connected charger."""
# allow id to be either cpid or cp_id
cp_id = self.cpids.get(id, id)

resp = False
if cp_id in self.charge_points:
if service_name == csvcs.service_availability.name:
resp = await self.charge_points[cp_id].set_availability(state)
resp = await self.charge_points[cp_id].set_availability(
state, connector_id=connector_id
)
if service_name == csvcs.service_charge_start.name:
resp = await self.charge_points[cp_id].start_transaction()
resp = await self.charge_points[cp_id].start_transaction(
connector_id=connector_id
)
if service_name == csvcs.service_charge_stop.name:
resp = await self.charge_points[cp_id].stop_transaction()
if service_name == csvcs.service_reset.name:
resp = await self.charge_points[cp_id].reset()
if service_name == csvcs.service_unlock.name:
resp = await self.charge_points[cp_id].unlock()
resp = await self.charge_points[cp_id].unlock(connector_id=connector_id)
return resp

def device_info(self):
Expand Down
Loading
Loading