Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 79 additions & 119 deletions custom_components/ocpp/ocppv16.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,36 +377,6 @@ async def clear_profile(
_LOGGER.debug("ClearChargingProfile raised %s (ignored)", ex)
return False

def _profile_ids_for(
self, conn_id: int, purpose: str, tx_id: int | None = None
) -> tuple[int, int]:
"""Return (chargingProfileId, stackLevel) unique per (purpose, connector).

- Keeps IDs small and stable across restarts.
- For TxProfile you may include tx_id to avoid clashes if multiple are alive.
"""
PURPOSE_CODE = {
"ChargePointMaxProfile": 1,
"TxDefaultProfile": 2,
"TxProfile": 3,
}
if purpose == "ChargePointMaxProfile":
conn_seg = 0
else:
try:
conn_seg = max(1, int(conn_id or 1))
except Exception:
conn_seg = 1

base = 1000
pid = base + PURPOSE_CODE[purpose] + conn_seg * 10

if purpose == "TxProfile" and tx_id is not None:
pid = pid * 1000 + (int(tx_id) % 1000)

stack_level = 1
return pid, stack_level

async def set_charge_rate(
self,
limit_amps: int = 32,
Expand All @@ -417,118 +387,108 @@ async def set_charge_rate(
"""Set charge rate."""
if profile is not None:
try:
resp = await self.call(
call.SetChargingProfile(
connector_id=int(conn_id), cs_charging_profiles=profile
)
req = call.SetChargingProfile(
connector_id=int(conn_id), cs_charging_profiles=profile
)
resp = await self.call(req)
if resp.status == ChargingProfileStatus.accepted:
return True
_LOGGER.warning("Custom SetChargingProfile rejected: %s", resp.status)
except Exception as ex:
_LOGGER.warning("Custom SetChargingProfile failed: %s", ex)
await self.notify_ha(
"Warning: Set charging profile failed with response Exception"
)
return False

if prof.SMART not in self._attr_supported_features:
_LOGGER.info("Smart charging is not supported by this charger")
return False

resp_units = await self.get_configuration(
# Determine allowed unit (default to Amps if not reported)
units_resp = await self.get_configuration(
ckey.charging_schedule_allowed_charging_rate_unit.value
)
if resp_units is None:
_LOGGER.warning("Failed to query charging rate unit, assuming Amps")
resp_units = om.current.value
if not units_resp:
_LOGGER.debug("Charging rate unit not reported; assuming Amps")
units_resp = om.current.value

use_amps = om.current.value in resp_units
limit_val = float(limit_amps if use_amps else limit_watts)
unit_val = (
use_amps = om.current.value in units_resp
limit_value = float(limit_amps if use_amps else limit_watts)
units_value = (
ChargingRateUnitType.amps.value
if use_amps
else ChargingRateUnitType.watts.value
)

# Build attempt order (CPMax -> TxDefault -> TxProfile if active)
attempts: list[tuple[int, str]] = []
attempts.append((0, "ChargePointMaxProfile"))
if conn_id and conn_id > 0:
attempts.append((conn_id, "TxDefaultProfile"))

has_active = bool(getattr(self, "active_transaction_id", 0))
if has_active:
tx_conn = next(
(c for c, tx in getattr(self, "_active_tx", {}).items() if tx),
conn_id or 1,
)
attempts.append((tx_conn, "TxProfile"))

await self.clear_profile(
None, ChargingProfilePurposeType.charge_point_max_profile
)
if conn_id and conn_id > 0:
await self.clear_profile(
conn_id, ChargingProfilePurposeType.tx_default_profile
# Read max stack level (default to 1 on parse errors)
try:
stack_level_resp = await self.get_configuration(
ckey.charge_profile_max_stack_level.value
)
stack_level = int(stack_level_resp)
except Exception:
stack_level = 1

def _mk_profile(purpose: str, cid: int) -> dict:
tx_id = (
self.active_transaction_id
if (purpose == "TxProfile" and has_active)
else None
)
pid, stack = self._profile_ids_for(cid, purpose, tx_id=tx_id)
# Helper to build a simple relative schedule with one period
def _mk_schedule(_units: str, _limit: float) -> dict:
return {
om.charging_profile_id.value: pid,
om.stack_level.value: stack,
om.charging_profile_kind.value: ChargingProfileKindType.relative.value,
om.charging_profile_purpose.value: purpose,
om.charging_schedule.value: {
om.charging_rate_unit.value: unit_val,
om.charging_schedule_period.value: [
{om.start_period.value: 0, om.limit.value: limit_val}
],
},
om.charging_rate_unit.value: _units,
om.charging_schedule_period.value: [
{om.start_period.value: 0, om.limit.value: _limit}
],
}

# Try each purpose/connector in order; optionally clear-by-id before setting
last_status = None
for cid, purpose in attempts:
try:
try:
tx_id = (
self.active_transaction_id
if (purpose == "TxProfile" and has_active)
else None
)
pid, _ = self._profile_ids_for(cid, purpose, tx_id=tx_id)
await self.call(call.ClearChargingProfile(id=pid))
except Exception:
pass

req = call.SetChargingProfile(
connector_id=cid, cs_charging_profiles=_mk_profile(purpose, cid)
)
resp = await self.call(req)
last_status = resp.status
if resp.status == ChargingProfileStatus.accepted:
_LOGGER.debug(
"SetChargingProfile accepted with purpose=%s connectorId=%s",
purpose,
cid,
)
return True
_LOGGER.debug(
"SetChargingProfile %s on connector %s -> %s",
purpose,
cid,
resp.status,
)
except Exception as ex:
_LOGGER.debug(
"SetChargingProfile %s on connector %s raised %s", purpose, cid, ex
)
# Try ChargePointMaxProfile (connectorId = 0)
try:
req = call.SetChargingProfile(
connector_id=0,
cs_charging_profiles={
om.charging_profile_id.value: 8,
om.stack_level.value: stack_level,
om.charging_profile_kind.value: ChargingProfileKindType.relative.value,
om.charging_profile_purpose.value: ChargingProfilePurposeType.charge_point_max_profile.value,
om.charging_schedule.value: _mk_schedule(units_value, limit_value),
},
)
resp = await self.call(req)
if resp.status == ChargingProfileStatus.accepted:
return True
_LOGGER.debug(
"ChargePointMaxProfile not accepted (%s); will try TxDefaultProfile.",
resp.status,
)
except Exception as ex:
_LOGGER.debug("ChargePointMaxProfile call raised: %s", ex)

if last_status is not None:
_LOGGER.warning("SetChargingProfile failed (last status=%s).", last_status)
# Fallback: TxDefaultProfile on target connector
# If no connector given, prefer 1 as a reasonable default.
target_cid = int(conn_id) if conn_id and int(conn_id) > 0 else 1
try:
# Some chargers are picky: try a slightly lower stack level if possible.
tx_stack = max(1, stack_level - 1)
req = call.SetChargingProfile(
connector_id=target_cid,
cs_charging_profiles={
om.charging_profile_id.value: 8,
om.stack_level.value: tx_stack,
om.charging_profile_kind.value: ChargingProfileKindType.relative.value,
om.charging_profile_purpose.value: ChargingProfilePurposeType.tx_default_profile.value,
om.charging_schedule.value: _mk_schedule(units_value, limit_value),
},
)
resp = await self.call(req)
if resp.status == ChargingProfileStatus.accepted:
return True
_LOGGER.warning("Set TxDefaultProfile rejected: %s", resp.status)
await self.notify_ha(
f"SetChargingProfile failed (last status={last_status})."
f"Warning: Set charging profile failed with response {resp.status}"
)
return False
return False
except Exception as ex:
_LOGGER.warning("Set TxDefaultProfile failed: %s", ex)
await self.notify_ha(f"Warning: Set charging profile failed: {ex}")
return False

async def set_availability(self, state: bool = True, connector_id: int | None = 0):
"""Change availability."""
Expand Down
32 changes: 0 additions & 32 deletions tests/test_additional_charge_point_v16.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,38 +414,6 @@ async def test_trigger_custom_message_unsupported_name(
await ws.close()


@pytest.mark.timeout(5)
@pytest.mark.parametrize(
"setup_config_entry",
[{"port": 9318, "cp_id": "CP_cov_profile_ids", "cms": "cms_services"}],
indirect=True,
)
@pytest.mark.parametrize("cp_id", ["CP_cov_profile_ids"])
@pytest.mark.parametrize("port", [9318])
async def test_profile_ids_for_bad_conn_id_cast(
hass, socket_enabled, cp_id, port, setup_config_entry
):
"""Test profile ids path when conn_id cast fails and conn_seg defaults to 1."""
cs = setup_config_entry
async with websockets.connect(
f"ws://127.0.0.1:{port}/{cp_id}", subprotocols=["ocpp1.6"]
) as ws:
cp = ChargePoint(f"{cp_id}_client", ws)
task = asyncio.create_task(cp.start())
try:
await cp.send_boot_notification()
await wait_ready(cs.charge_points[cp_id])
srv = cs.charge_points[cp_id]
pid, level = srv._profile_ids_for(conn_id="X", purpose="TxDefaultProfile")
# conn_seg should fall back to 1 -> pid = 1000 + 2 + (1*10) = 1012
assert (pid, level) == (1012, 1)
finally:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
await ws.close()


@pytest.mark.timeout(10)
@pytest.mark.parametrize(
"setup_config_entry",
Expand Down
Loading
Loading