1
1
import asyncio
2
+ import socket
3
+ import time
4
+ import uuid
2
5
from typing import Any , Dict , Generator , IO , List
3
6
4
7
from aiohttp import ClientSession
@@ -19,12 +22,15 @@ def get_targets(targets: List[Dict[str, Dict[str, Any]]]) -> Generator[Monitorin
19
22
yield MonitoringTarget (** target )
20
23
21
24
22
- async def monitor (target : MonitoringTarget ) -> MonitoringResult :
25
+ async def monitor_http (target : MonitoringTarget ) -> MonitoringResult :
23
26
state : bool
24
27
async with ClientSession (
25
28
read_timeout = target .timeout ,
26
29
conn_timeout = target .timeout ) as session :
27
- method = getattr (session , target .method .lower ())
30
+ if target .method :
31
+ method = getattr (session , target .method .lower ())
32
+ else :
33
+ raise Exception ("Required method for monitoring http" )
28
34
error = ""
29
35
response = None
30
36
is_retry = False
@@ -75,6 +81,51 @@ async def monitor(target: MonitoringTarget) -> MonitoringResult:
75
81
response = await response .text () if response else error )
76
82
77
83
84
+ async def monitor_stun (target : MonitoringTarget ) -> MonitoringResult :
85
+ err = Exception ()
86
+ for i in range (target .retry ):
87
+ try :
88
+ sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
89
+ sock .setblocking (False )
90
+ host , port = target .url .split ("stun:" )[1 ].split (":" )
91
+ sock .bind (("0.0.0.0" , int (port )))
92
+ mtype = b"\x00 \x01 "
93
+ msg = b"\x00 \x01 \x00 \x00 \xff \xff \xff \xff "
94
+ tid = uuid .uuid4 ().bytes
95
+ length = bytearray ([len (msg )]).rjust (2 , b"\x00 " )
96
+ header = mtype + length + tid
97
+ sock .sendto (header + msg , (host , int (port )))
98
+ in_err = Exception ()
99
+ for j in range (int (target .timeout * 2 )):
100
+ time .sleep (0.5 )
101
+ try :
102
+ sock .recvfrom (1024 )
103
+ break
104
+ except Exception as e :
105
+ in_err = e
106
+ else :
107
+ raise in_err
108
+ break
109
+ except Exception as e :
110
+ logger .warning (("Monitor failed. "
111
+ f"Target: { target .url } " ))
112
+ err = e
113
+ time .sleep (target .retry_wait )
114
+ else :
115
+ return MonitoringResult (
116
+ expected_status_code = 0 ,
117
+ status_code = 0 ,
118
+ state = False ,
119
+ url = target .url ,
120
+ response = str (err ))
121
+ return MonitoringResult (
122
+ expected_status_code = 0 ,
123
+ status_code = 0 ,
124
+ state = True ,
125
+ url = target .url ,
126
+ response = "" )
127
+
128
+
78
129
async def watch (f : IO = None ) -> None :
79
130
if f is None :
80
131
f = open ("targets.yaml" )
@@ -85,7 +136,12 @@ async def watch(f: IO = None) -> None:
85
136
except Exception :
86
137
raise IncorrectYaml ()
87
138
for target in targets :
88
- task = asyncio .ensure_future (monitor (target ))
139
+ if target .url .startswith ("http://" ) or target .url .startswith ("https://" ):
140
+ task = asyncio .ensure_future (monitor_http (target ))
141
+ elif target .url .startswith ("stun:" ):
142
+ task = asyncio .ensure_future (monitor_stun (target ))
143
+ else :
144
+ logger .warning (f"Unsupported target type. target: { target .url } " )
89
145
monitors .append (task )
90
146
results = await asyncio .gather (* monitors )
91
147
for result in results :
0 commit comments