Skip to content

Commit a6265d9

Browse files
committed
Add explicit RTP protocol selection (IPv4 or IPv6).
1 parent e95cc28 commit a6265d9

File tree

3 files changed

+218
-38
lines changed

3 files changed

+218
-38
lines changed

sippy/Rtp/EPoint.py

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import errno
55

66
from rtpsynth.RtpServer import RtpQueueFullError, RtpServer
7-
from sippy.misc import local4remote
87
from sippy.Time.MonoTime import MonoTime
98

109
from .Core.AudioChunk import AudioChunk
@@ -27,11 +26,12 @@ def __init__(self, rc:RTPConf, rtp_params:RTPParams, audio_in:callable,
2726
self.id = uuid4()
2827
self.rtp_params = rtp_params
2928
self.handlers = handlers or RTPHandlers()
29+
self._palloc = rc.palloc
3030
self._rtp_server = None
3131
self.state_lock = Lock()
3232
self.writer = None
3333
self.rsess = self.make_rtp_instream(rtp_params, audio_in)
34-
rserv_opts = self.make_udp_server_opts(rc, rtp_params)
34+
rserv_opts = self.make_udp_server_opts(rtp_params)
3535
self.rserv = self.make_udp_server(rserv_opts)
3636
if self.rtp_params.rtp_target is not None:
3737
self.writer_setup()
@@ -42,12 +42,9 @@ def make_writer(self, rtp_params:RTPParams):
4242
def make_rtp_instream(self, rtp_params:RTPParams, audio_in:callable):
4343
return self.handlers.rtp_instream_cls(rtp_params, audio_in)
4444

45-
def make_udp_server_opts(self, rc:RTPConf, rtp_params:RTPParams):
46-
if rtp_params.rtp_target is None:
47-
rtp_laddr = '0.0.0.0'
48-
else:
49-
rtp_laddr = local4remote(rtp_params.rtp_target[0])
50-
return (rtp_laddr, rc.palloc)
45+
def make_udp_server_opts(self, rtp_params:RTPParams):
46+
palloc = self._palloc if rtp_params.rtp_lport == 0 else rtp_params.rtp_lport
47+
return (rtp_params.rtp_laddr, palloc)
5148

5249
def make_udp_server(self, rserv_opts):
5350
rtp_laddr, palloc = rserv_opts
@@ -56,30 +53,10 @@ def make_udp_server(self, rserv_opts):
5653
channel = None
5754
self._rtp_server = rtp_server
5855
try:
59-
if callable(palloc):
60-
ntry = -1
61-
while True:
62-
ntry += 1
63-
bind_port = int(palloc(ntry))
64-
try:
65-
channel = rtp_server.create_channel(
66-
pkt_in=self.rtp_received,
67-
bind_host=rtp_laddr,
68-
bind_port=bind_port,
69-
)
70-
except OSError as ex:
71-
if ex.errno == errno.EADDRINUSE:
72-
continue
73-
raise
74-
break
75-
else:
76-
channel = rtp_server.create_channel(
77-
pkt_in=self.rtp_received,
78-
bind_host=rtp_laddr,
79-
bind_port=int(palloc),
80-
)
81-
if self.rtp_params.rtp_target is not None:
82-
channel.set_target(self.rtp_params.rtp_target[0], self.rtp_params.rtp_target[1])
56+
channel = self._bind_channel(rtp_server, rtp_laddr, palloc)
57+
target = self.rtp_params.rtp_target
58+
if target is not None:
59+
channel.set_target(target[0], target[1])
8360
return channel
8461
except Exception:
8562
release_rtp_server(rtp_server)
@@ -88,6 +65,51 @@ def make_udp_server(self, rserv_opts):
8865
channel.close()
8966
raise
9067

68+
def _create_channel(self, rtp_server:RtpServer, bind_host:str, bind_port:int):
69+
bind_family = self.rtp_params.rtp_family
70+
ch_kwargs = dict(pkt_in=self.rtp_received, bind_host=bind_host, bind_port=bind_port)
71+
ch = rtp_server.create_channel(bind_family=bind_family, **ch_kwargs)
72+
self.rtp_params.rtp_lport = bind_port
73+
return ch
74+
75+
def _bind_channel(self, rtp_server:RtpServer, rtp_laddr:str, palloc, preferred_port:int=None):
76+
if preferred_port is not None:
77+
try:
78+
return self._create_channel(rtp_server, rtp_laddr, preferred_port)
79+
except OSError as ex:
80+
if ex.errno != errno.EADDRINUSE:
81+
raise
82+
if callable(palloc):
83+
ntry = -1
84+
while True:
85+
ntry += 1
86+
bind_port = int(palloc(ntry))
87+
try:
88+
return self._create_channel(rtp_server, rtp_laddr, bind_port)
89+
except OSError as ex:
90+
if ex.errno == errno.EADDRINUSE:
91+
continue
92+
raise
93+
return self._create_channel(rtp_server, rtp_laddr, int(palloc))
94+
95+
def _swap_channel(self, old_channel, rtp_params:RTPParams):
96+
with self.state_lock:
97+
rtp_server = self._rtp_server
98+
if rtp_server is None or self.rserv is not old_channel:
99+
return
100+
rtp_laddr, palloc = self.make_udp_server_opts(rtp_params)
101+
preferred_port = old_channel.local_addr[1]
102+
new_channel = self._bind_channel(rtp_server, rtp_laddr, palloc, preferred_port=preferred_port)
103+
target = rtp_params.rtp_target
104+
if target is not None:
105+
new_channel.set_target(target[0], target[1])
106+
with self.state_lock:
107+
if self._rtp_server is None or self.rserv is not old_channel:
108+
new_channel.close()
109+
return
110+
self.rserv = new_channel
111+
old_channel.close()
112+
91113
def writer_setup(self):
92114
assert self.writer is None
93115
writer = self.make_writer(self.rtp_params)
@@ -124,8 +146,11 @@ def update(self, rtp_params:RTPParams):
124146
old_writer = None
125147
need_new_writer = False
126148
target_changed = False
149+
proto_changed = False
127150
with self.state_lock:
128151
target_changed = self.rtp_params.rtp_target != rtp_params.rtp_target
152+
proto_changed = self.rtp_params.rtp_proto != rtp_params.rtp_proto
153+
self.rtp_params.rtp_proto = rtp_params.rtp_proto
129154
self.rtp_params.rtp_target = rtp_params.rtp_target
130155
ptime_changed = self.rtp_params.out_ptime != rtp_params.out_ptime
131156
self.rtp_params.out_ptime = rtp_params.out_ptime
@@ -137,15 +162,18 @@ def update(self, rtp_params:RTPParams):
137162
self.writer = None
138163
elif self.writer is None:
139164
need_new_writer = True
140-
elif ptime_changed:
165+
elif ptime_changed or proto_changed:
141166
old_writer = self.writer
142167
self.writer = None
143168
need_new_writer = True
144-
if target_changed and channel is not None and self.rtp_params.rtp_target is not None:
145-
channel.set_target(rtp_params.rtp_target[0], rtp_params.rtp_target[1])
146169
if old_writer is not None:
147170
old_writer.end()
148171
old_writer.join()
172+
if proto_changed and channel is not None:
173+
self._swap_channel(channel, rtp_params)
174+
elif target_changed and channel is not None and self.rtp_params.rtp_target is not None:
175+
target = self.rtp_params.rtp_target
176+
channel.set_target(target[0], target[1])
149177
if need_new_writer:
150178
new_writer = self.make_writer(rtp_params)
151179
new_writer.set_pkt_send_f(self.send_pkt)

sippy/Rtp/Params.py

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,104 @@
1+
from functools import lru_cache
2+
from ipaddress import ip_address
13
from typing import Optional, Tuple, Type, Union
4+
from socket import AF_INET, AF_INET6
5+
6+
from sippy.misc import local4remote
27

38
from .Codecs.G711 import G711Codec
49
from .Codecs.G722 import G722Codec
510

11+
12+
@lru_cache(maxsize=256)
13+
def canonicalize_rtp_host(host: str) -> str:
14+
if host.startswith('[') and host.endswith(']'):
15+
host = host[1:-1]
16+
try:
17+
return str(ip_address(host))
18+
except ValueError:
19+
return host
20+
21+
22+
def canonicalize_rtp_target(rtp_target: Optional[Tuple[str, int]]) -> Optional[Tuple[str, int]]:
23+
if rtp_target is None:
24+
return None
25+
host, port = rtp_target
26+
assert isinstance(host, str)
27+
return (canonicalize_rtp_host(host), int(port))
28+
29+
30+
def canonicalize_rtp_address(address) -> Optional[Tuple[str, int]]:
31+
if not isinstance(address, tuple) or len(address) < 2:
32+
return None
33+
host, port = address[:2]
34+
if not isinstance(host, str):
35+
return None
36+
return (canonicalize_rtp_host(host), int(port))
37+
38+
639
class RTPParams():
7-
rtp_target: Optional[Tuple[str, int]]
40+
_rtp_target: Optional[Tuple[str, int]]
41+
_rtp_proto: str
42+
rtp_laddr: str
43+
rtp_lport: int = 0
844
out_ptime: int
945
out_sr: int
46+
default_rtp_proto: str = 'IPV4'
1047
default_ptime: int = 20
1148
default_sr: int = 16000
1249
codec: Type[Union[G711Codec, G722Codec]]
50+
1351
def __init__(self, rtp_target:Optional[Tuple[str, int]], out_ptime:int=default_ptime,
14-
out_sr:int=default_sr):
15-
assert rtp_target is None or (isinstance(rtp_target, tuple) and len(rtp_target) == 2)
52+
out_sr:int=default_sr, rtp_proto:str=default_rtp_proto):
53+
self._rtp_target = None
54+
self._rtp_proto = self.default_rtp_proto
55+
self.rtp_laddr = '0.0.0.0'
56+
self.rtp_proto = rtp_proto
1657
self.rtp_target = rtp_target
1758
self.out_ptime = out_ptime
1859
self.out_sr = out_sr
60+
61+
@property
62+
def rtp_target(self) -> Optional[Tuple[str, int]]:
63+
return self._rtp_target
64+
65+
@rtp_target.setter
66+
def rtp_target(self, rtp_target: Optional[Tuple[str, int]]):
67+
assert rtp_target is None or (isinstance(rtp_target, tuple) and len(rtp_target) == 2)
68+
# Fast path for common "set to current value" updates.
69+
if rtp_target == self._rtp_target:
70+
return
71+
new_rtp_target = canonicalize_rtp_target(rtp_target)
72+
if new_rtp_target == self._rtp_target:
73+
return
74+
self._rtp_target = new_rtp_target
75+
self.rtp_laddr = self._get_laddr()
76+
77+
@property
78+
def rtp_proto(self) -> str:
79+
return self._rtp_proto
80+
81+
@rtp_proto.setter
82+
def rtp_proto(self, rtp_proto: str):
83+
assert isinstance(rtp_proto, str)
84+
rtp_proto = rtp_proto.upper()
85+
assert rtp_proto in ('IPV4', 'IPV6')
86+
if rtp_proto == self._rtp_proto:
87+
return
88+
self._rtp_proto = rtp_proto
89+
self.rtp_laddr = self._get_laddr()
90+
91+
@property
92+
def rtp_family(self):
93+
return AF_INET if self.rtp_proto == 'IPV4' else AF_INET6
94+
95+
def _get_laddr(self):
96+
af = self.rtp_family
97+
target = self.rtp_target
98+
if target is None:
99+
rtp_laddr = '0.0.0.0' if af == AF_INET else '::'
100+
else:
101+
rtp_laddr = local4remote(target[0], family=af)
102+
if af == AF_INET6:
103+
rtp_laddr = rtp_laddr[1:-1]
104+
return rtp_laddr

tests/test_RtpParams.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import unittest
2+
from socket import AF_INET, AF_INET6
3+
from unittest.mock import patch
4+
5+
from sippy.Rtp.Params import RTPParams, canonicalize_rtp_address, canonicalize_rtp_target
6+
7+
8+
class TestRtpParams(unittest.TestCase):
9+
def test_canonicalize_ipv6_expanded_target(self):
10+
target = canonicalize_rtp_target(('0:0:0:0:0:0:0:1', 14086))
11+
self.assertEqual(target, ('::1', 14086))
12+
13+
def test_canonicalize_bracketed_ipv6_target(self):
14+
target = canonicalize_rtp_target(('[::1]', '13998'))
15+
self.assertEqual(target, ('::1', 13998))
16+
17+
def test_canonicalize_incoming_ipv6_address_tuple(self):
18+
source = canonicalize_rtp_address(('0:0:0:0:0:0:0:1', 14086, 0, 0))
19+
self.assertEqual(source, ('::1', 14086))
20+
21+
def test_params_store_canonical_target(self):
22+
a = RTPParams(('::1', 14086), rtp_proto='IPV6')
23+
b = RTPParams(('0:0:0:0:0:0:0:1', 14086), rtp_proto='IPV6')
24+
self.assertEqual(a.rtp_target, b.rtp_target)
25+
26+
def test_rtp_proto_setter_normalizes_and_updates_laddr(self):
27+
params = RTPParams(None, rtp_proto='IPV4')
28+
self.assertEqual(params.rtp_laddr, '0.0.0.0')
29+
params.rtp_proto = 'ipv6'
30+
self.assertEqual(params.rtp_proto, 'IPV6')
31+
self.assertEqual(params.rtp_laddr, '::')
32+
33+
def test_rtp_target_setter_canonicalizes_and_updates_laddr(self):
34+
params = RTPParams(None, rtp_proto='IPV6')
35+
with patch('sippy.Rtp.Params.local4remote', return_value='[2001:db8::1]') as mock_l4r:
36+
params.rtp_target = ('0:0:0:0:0:0:0:1', '14086')
37+
mock_l4r.assert_called_once_with('::1', family=AF_INET6)
38+
self.assertEqual(params.rtp_target, ('::1', 14086))
39+
self.assertEqual(params.rtp_laddr, '2001:db8::1')
40+
41+
def test_rtp_target_setter_skips_canonicalize_and_laddr_on_unchanged_value(self):
42+
params = RTPParams(None, rtp_proto='IPV6')
43+
with patch('sippy.Rtp.Params.canonicalize_rtp_target',
44+
side_effect=AssertionError('canonicalize_rtp_target should not be called')):
45+
with patch.object(params, '_get_laddr', side_effect=AssertionError('_get_laddr should not be called')):
46+
params.rtp_target = None
47+
48+
def test_rtp_proto_setter_skips_laddr_on_unchanged_value(self):
49+
params = RTPParams(None, rtp_proto='IPV4')
50+
with patch.object(params, '_get_laddr', side_effect=AssertionError('_get_laddr should not be called')):
51+
params.rtp_proto = 'ipv4'
52+
53+
def test_rtp_family_property_maps_proto(self):
54+
params = RTPParams(None, rtp_proto='IPV4')
55+
self.assertEqual(params.rtp_family, AF_INET)
56+
params.rtp_proto = 'IPV6'
57+
self.assertEqual(params.rtp_family, AF_INET6)
58+
59+
def test_rtp_family_property_is_read_only(self):
60+
params = RTPParams(None, rtp_proto='IPV4')
61+
with self.assertRaises(AttributeError):
62+
params.rtp_family = AF_INET6
63+
64+
65+
if __name__ == '__main__':
66+
unittest.main()

0 commit comments

Comments
 (0)