11import bluetooth
22import logging
3+ from bluetooth import BluetoothSocket
4+ from logging import Logger
35
46class ConnectionManager :
5- INIT_CMD = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00"
6- START_CMD = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00"
7- STOP_CMD = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00"
7+ INIT_CMD : str = "00 00 04 00 01 00 02 00 00 00 00 00 00 00 00 00"
8+ START_CMD : str = "04 00 04 00 17 00 00 00 10 00 10 00 08 A1 02 42 0B 08 0E 10 02 1A 05 01 40 9C 00 00"
9+ STOP_CMD : str = "04 00 04 00 17 00 00 00 10 00 11 00 08 7E 10 02 42 0B 08 4E 10 02 1A 05 01 00 00 00 00"
810
9- def __init__ (self , bt_addr = "28:2D:7F:C2:05:5B" , psm = 0x1001 , logger = None ):
10- self .bt_addr = bt_addr
11- self .psm = psm
12- self .logger = logger if logger else logging .getLogger (__name__ )
13- self .sock = None
14- self .connected = False
15- self .started = False
11+ def __init__ (self , bt_addr : str = "28:2D:7F:C2:05:5B" , psm : int = 0x1001 , logger : Logger = None ) -> None :
12+ self .bt_addr : str = bt_addr
13+ self .psm : int = psm
14+ self .logger : Logger = logger if logger else logging .getLogger (__name__ )
15+ self .sock : BluetoothSocket = None
16+ self .connected : bool = False
17+ self .started : bool = False
1618
17- def connect (self ):
19+ def connect (self ) -> bool :
1820 self .logger .info (f"Connecting to { self .bt_addr } on PSM { self .psm :#04x} ..." )
1921 try :
20- self .sock = bluetooth . BluetoothSocket (bluetooth .L2CAP )
22+ self .sock = BluetoothSocket (bluetooth .L2CAP )
2123 self .sock .connect ((self .bt_addr , self .psm ))
2224 self .connected = True
2325 self .logger .info ("Connected to AirPods." )
@@ -28,7 +30,7 @@ def connect(self):
2830 self .connected = False
2931 return self .connected
3032
31- def send_start (self ):
33+ def send_start (self ) -> bool :
3234 if not self .connected :
3335 self .logger .error ("Not connected. Cannot send START command." )
3436 return False
@@ -40,7 +42,7 @@ def send_start(self):
4042 self .logger .info ("START command has already been sent." )
4143 return True
4244
43- def send_stop (self ):
45+ def send_stop (self ) -> None :
4446 if self .connected and self .started :
4547 try :
4648 self .sock .send (bytes .fromhex (self .STOP_CMD ))
@@ -51,12 +53,12 @@ def send_stop(self):
5153 else :
5254 self .logger .info ("Cannot send STOP; not started or not connected." )
5355
54- def disconnect (self ):
56+ def disconnect (self ) -> None :
5557 if self .sock :
5658 try :
5759 self .sock .close ()
5860 self .logger .info ("Disconnected from AirPods." )
5961 except Exception as e :
6062 self .logger .error (f"Error during disconnect: { e } " )
6163 self .connected = False
62- self .started = False
64+ self .started = False
0 commit comments