-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcap.py
More file actions
164 lines (143 loc) · 4.93 KB
/
cap.py
File metadata and controls
164 lines (143 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""IRCv3 capability negotiation for both upstream and downstream connections."""
from __future__ import annotations
import enum
import logging
from typing import Callable, Optional
from irc_parser import IRCMessage
logger = logging.getLogger(__name__)
# Capabilities the bouncer wants from upstream servers
UPSTREAM_CAPS_WANTED = {
"message-tags",
"server-time",
"away-notify",
"account-notify",
"extended-join",
"sasl",
"labeled-response",
"echo-message",
"batch",
"draft/chathistory",
"chathistory",
"draft/typing",
"typing",
"multi-prefix",
"userhost-in-names",
"cap-notify",
"invite-notify",
"setname",
"account-tag",
"chghost",
}
# Capabilities the bouncer advertises to downstream clients
DOWNSTREAM_CAPS_AVAILABLE = {
"message-tags",
"server-time",
"away-notify",
"account-notify",
"extended-join",
"labeled-response",
"batch",
"draft/typing",
"typing",
"multi-prefix",
"userhost-in-names",
"cap-notify",
"invite-notify",
"setname",
"account-tag",
"chghost",
"sasl",
}
# Caps that only work if the upstream server also supports them.
# After auth, the bouncer sends CAP DEL for any of these the upstream lacks.
UPSTREAM_REQUIRED_CAPS = {
"away-notify",
"account-notify",
"extended-join",
"invite-notify",
"setname",
"account-tag",
"chghost",
"draft/typing",
"typing",
}
class CapState(enum.Enum):
NEGOTIATING = "negotiating"
DONE = "done"
class CapNegotiator:
"""Tracks capability negotiation state."""
def __init__(
self, is_upstream: bool = True,
extra_wanted: set[str] | None = None,
override_caps: set[str] | None = None,
):
self.is_upstream = is_upstream
self.advertised: dict[str, str | None] = {} # cap -> value or None
self.enabled: set[str] = set()
self.state = CapState.NEGOTIATING
self._extra_wanted = extra_wanted or set()
self._override_caps = override_caps # If set, replaces the defaults entirely
@property
def wanted(self) -> set[str]:
if self.is_upstream:
base = self._override_caps if self._override_caps is not None else UPSTREAM_CAPS_WANTED
return base | self._extra_wanted
return set() # downstream: we don't request caps, we advertise them
def get_caps_to_request(self) -> set[str]:
"""Return caps we want that the server advertises."""
available = set(self.advertised.keys())
return self.wanted & available - self.enabled
def handle_ls(self, cap_str: str) -> None:
"""Process a CAP LS response."""
for token in cap_str.split():
if "=" in token:
name, value = token.split("=", 1)
self.advertised[name] = value
else:
self.advertised[token] = None
def handle_ack(self, cap_str: str) -> set[str]:
"""Process a CAP ACK response. Returns newly enabled caps."""
newly_enabled = set()
for token in cap_str.split():
cap = token.lstrip("-")
if token.startswith("-"):
self.enabled.discard(cap)
else:
self.enabled.add(cap)
newly_enabled.add(cap)
return newly_enabled
def handle_nak(self, cap_str: str) -> None:
"""Process a CAP NAK response."""
for token in cap_str.split():
logger.debug("CAP NAK: %s", token)
def handle_new(self, cap_str: str) -> set[str]:
"""Process a CAP NEW notification. Returns caps we should request."""
self.handle_ls(cap_str)
return self.get_caps_to_request()
def handle_del(self, cap_str: str) -> None:
"""Process a CAP DEL notification."""
for token in cap_str.split():
self.advertised.pop(token, None)
self.enabled.discard(token)
def build_advertise_string(
self, upstream_enabled: set[str] | None = None,
extra_caps: set[str] | None = None,
downstream_override: set[str] | None = None,
) -> str:
"""Build the CAP LS response string for downstream clients.
downstream_override: if set, completely replaces DOWNSTREAM_CAPS_AVAILABLE.
extra_caps: additional pass-through caps from caps_wanted config.
upstream_enabled: if given, only advertise caps the upstream has.
"""
caps = downstream_override if downstream_override is not None else set(DOWNSTREAM_CAPS_AVAILABLE)
if extra_caps:
caps |= extra_caps
if upstream_enabled is not None:
bouncer_only = {"sasl", "batch"}
caps = (caps & upstream_enabled) | (caps & bouncer_only)
if extra_caps:
caps |= (extra_caps & upstream_enabled)
return " ".join(sorted(caps))
def supports(self, cap: str) -> bool:
"""Check if a capability is currently enabled."""
return cap in self.enabled