-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsasl.py
More file actions
68 lines (49 loc) · 1.97 KB
/
sasl.py
File metadata and controls
68 lines (49 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""SASL authentication for upstream IRC connections."""
from __future__ import annotations
import asyncio
import base64
import logging
from irc_parser import IRCMessage
logger = logging.getLogger(__name__)
# Max AUTHENTICATE chunk size per IRC spec
SASL_CHUNK_SIZE = 400
def build_plain_response(username: str, password: str, authzid: str = "") -> str:
"""Build a SASL PLAIN response (base64-encoded)."""
payload = f"{authzid}\0{username}\0{password}"
return base64.b64encode(payload.encode("utf-8")).decode("ascii")
def build_external_response() -> str:
"""Build a SASL EXTERNAL response."""
return "+"
async def perform_sasl(
send_func,
mechanism: str,
username: str = "",
password: str = "",
) -> None:
"""Send the AUTHENTICATE command with the appropriate payload.
The caller is responsible for handling 903/904/905 responses.
"""
await send_func(IRCMessage(command="AUTHENTICATE", params=[mechanism]))
# The actual AUTHENTICATE response will be sent when we receive
# AUTHENTICATE + from the server. We store the credentials for later.
# This is handled in upstream.py's message handler.
def get_sasl_payload(mechanism: str, username: str, password: str) -> list[str]:
"""Get the SASL payload chunks to send.
Returns a list of base64 strings, each <= 400 chars, with '+' appended
if the last chunk is exactly 400 chars.
"""
if mechanism.upper() == "PLAIN":
encoded = build_plain_response(username, password)
elif mechanism.upper() == "EXTERNAL":
encoded = build_external_response()
else:
raise ValueError(f"Unsupported SASL mechanism: {mechanism}")
if encoded == "+":
return ["+"]
chunks = []
for i in range(0, len(encoded), SASL_CHUNK_SIZE):
chunks.append(encoded[i : i + SASL_CHUNK_SIZE])
# If the last chunk is exactly 400 chars, append '+' to signal end
if len(chunks[-1]) == SASL_CHUNK_SIZE:
chunks.append("+")
return chunks