diff --git a/logentries/__init__.py b/logentries/__init__.py index b64e423..e077f03 100644 --- a/logentries/__init__.py +++ b/logentries/__init__.py @@ -1 +1 @@ -from .utils import LogentriesHandler +from .utils import LogentriesHandler, LogentriesWHHandler diff --git a/logentries/utils.py b/logentries/utils.py index 1cdddff..53bf49a 100644 --- a/logentries/utils.py +++ b/logentries/utils.py @@ -15,12 +15,14 @@ import sys import certifi - +import requests # Size of the internal event queue QUEUE_SIZE = 32768 # Logentries API server address LE_API_DEFAULT = "data.logentries.com" + + # Port number for token logging to Logentries API server LE_PORT_DEFAULT = 80 LE_TLS_PORT_DEFAULT = 443 @@ -42,7 +44,6 @@ def dbg(msg): print(LE + msg) - class PlainTextSocketAppender(threading.Thread): def __init__(self, verbose=True, le_api=LE_API_DEFAULT, le_port=LE_PORT_DEFAULT, le_tls_port=LE_TLS_PORT_DEFAULT): threading.Thread.__init__(self) @@ -214,3 +215,105 @@ def emit(self, record): def close(self): logging.Handler.close(self) + + +LE_WH_API_DEFAULT = "http://eu.webhook.logs.insight.rapid7.com/v1/noformat/" +class LogentriesWHHandler(LogentriesHandler): + def __init__(self, token, verbose=True, format=None, le_url=LE_WH_API_DEFAULT): + logging.Handler.__init__(self) + self.token = token + self.good_config = True + self.verbose = verbose + # give the socket 10 seconds to flush, + # otherwise drop logs + self.timeout = 10 + if not le_helpers.check_token(token): + if self.verbose: + dbg(INVALID_TOKEN) + self.good_config = False + if format is None: + format = logging.Formatter('%(asctime)s : %(levelname)s, %(message)s', + '%a %b %d %H:%M:%S %Z %Y') + self.setFormatter(format) + self.setLevel(logging.DEBUG) + + self._thread = HTTPAppender(verbose=verbose, le_url=le_url+token, token=token) + + def flush(self): + # wait for all queued logs to be send + now = time.time() + while not self._thread.empty(): + time.sleep(0.2) + if time.time() - now > self.timeout: + break + + def emit(self, record): + if self.good_config and not self._thread.is_alive(): + try: + self._thread.start() + if self.verbose: + dbg("Starting Logentries Asynchronous HTTP Appender") + except RuntimeError: # It's already started. + pass + + msg = self.format(record).rstrip('\n') + + try: + self._thread._queue.put_nowait(msg) + except: + # Queue is full, try to remove the oldest message and put again + try: + self._thread._queue.get_nowait() + self._thread._queue.put_nowait(msg) + except: + # Race condition, no need for any action here + pass + + def close(self): + logging.Handler.close(self) + +import requests + +class HTTPAppender(threading.Thread): + def __init__(self, verbose=True, le_url=LE_WH_API_DEFAULT, token=None): + threading.Thread.__init__(self) + + self.daemon = True + self.verbose = verbose + self._queue = le_helpers.create_queue(QUEUE_SIZE) + self.le_url = le_url + self.token = token + + def empty(self): + return self._queue.empty() + + def run(self): + try: + + # Send data in queue + while True: + # Take data from queue + data = self._queue.get(block=True) + + # Replace newlines with Unicode line separator + # for multi-line events + if not le_helpers.is_unicode(data): + multiline = le_helpers.create_unicode(data).replace( + '\n', LINE_SEP) + else: + multiline = data.replace('\n', LINE_SEP) + multiline += "\n" + # Send data, reconnect if needed + while True: + try: + requests.post(self.le_url,data=multiline.encode('utf-8')) + except Exception: + try: + requests.post(self.le_url,data=multiline.encode('utf-8')) + except Exception as ex: + dbg("Failed to send log to logentries! {}".format(ex)) + pass + break + except KeyboardInterrupt: + if self.verbose: + dbg("Logentries asynchronous HTTP client interrupted") diff --git a/setup.py b/setup.py index d6fc755..59e4cc1 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='Logentries', - version='0.8', + version='0.9', author='Mark Lacomber', author_email='marklacomber@gmail.com', packages=['logentries'], diff --git a/test_http.py b/test_http.py new file mode 100644 index 0000000..1beb1bf --- /dev/null +++ b/test_http.py @@ -0,0 +1,15 @@ +import unittest +import time + +class MyTestCase(unittest.TestCase): + def test_something(self): + import logentries + import logging + handler = logentries.LogentriesWHHandler("") + logging.root.addHandler(handler) + logging.warning("TEST 1 2 3") + time.sleep(60) + + +if __name__ == '__main__': + unittest.main()