From c61745ed1e30586914d970296ec6099768227893 Mon Sep 17 00:00:00 2001 From: David <35256944+davidonna@users.noreply.github.com> Date: Wed, 31 Oct 2018 12:04:18 +0100 Subject: [PATCH 01/31] Backport nanosecond_precision support from fluent/fluent-logger-python (#3) --- CHANGELOG.rst | 4 ++-- aiofluent/handler.py | 3 +++ aiofluent/sender.py | 28 ++++++++++++++++++++++++---- setup.py | 2 +- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c8777a0..736326c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,8 @@ 1.2.1 (unreleased) ------------------ -- Nothing changed yet. - +- Add support for nanosecond precision timestamps + [davidonna] 1.2.0 (2018-06-14) ------------------ diff --git a/aiofluent/handler.py b/aiofluent/handler.py index e47233c..b8f94b0 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -127,12 +127,15 @@ def __init__(self, timeout=3, verbose=False, loop=None, + nanosecond_precision=False, **kwargs): self.loop = loop self.tag = tag + self.nanosecond_precision = nanosecond_precision self.sender = sender.FluentSender(tag, host=host, port=port, timeout=timeout, verbose=verbose, + nanosecond_precision=self.nanosecond_precision, **kwargs) logging.Handler.__init__(self) diff --git a/aiofluent/sender.py b/aiofluent/sender.py index 5b2ac74..52abd01 100644 --- a/aiofluent/sender.py +++ b/aiofluent/sender.py @@ -5,6 +5,7 @@ import sys import time import traceback +import struct _global_sender = None @@ -42,6 +43,17 @@ async def connection_factory(sender): sender.last_error = ex +class EventTime(msgpack.ExtType): + def __new__(cls, timestamp): + seconds = int(timestamp) + nanoseconds = int(timestamp % 1 * 10 ** 9) + return super(EventTime, cls).__new__( + cls, + code=0, + data=struct.pack(">II", seconds, nanoseconds), + ) + + class FluentSender(object): def __init__(self, tag, @@ -53,6 +65,7 @@ def __init__(self, buffer_overflow_handler=None, retry_timeout=30, connection_factory=connection_factory, + nanosecond_precision=False, **kwargs): self._tag = tag @@ -62,6 +75,7 @@ def __init__(self, self._timeout = timeout self._verbose = verbose self._buffer_overflow_handler = buffer_overflow_handler + self._nanosecond_precision = nanosecond_precision self._pendings = None self._reader = None @@ -94,11 +108,17 @@ async def get_writer(self): return self._writer async def async_emit(self, label, data, timestamp=None): - if timestamp is None: - timestamp = int(time.time()) - return await self.async_emit_with_time(label, timestamp, data) + if timestamp is not None: + ev_time = timestamp + elif self._nanosecond_precision: + ev_time = EventTime(time.time()) + else: + ev_time = int(time.time()) + return await self.async_emit_with_time(label, ev_time, data) async def async_emit_with_time(self, label, timestamp, data): + if self._nanosecond_precision and isinstance(timestamp, float): + timestamp = EventTime(timestamp) try: bytes_ = self._make_packet(label, timestamp, data) except Exception as e: @@ -170,7 +190,7 @@ def _call_buffer_overflow_handler(self, pending_events): try: if self._buffer_overflow_handler: self._buffer_overflow_handler(pending_events) - except Exception as e: + except Exception: # User should care any exception in handler pass diff --git a/setup.py b/setup.py index 6c1cfe7..45746bc 100755 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ 'pytest-asyncio>=0.8.0', 'pytest-aiohttp', 'pytest-cov', - 'coverage==4.0.3' + 'coverage' ] }, test_suite='tests' From 0ac2f712adefccf9549eb989ef7885ab5cb108d5 Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 31 Oct 2018 07:04:56 -0400 Subject: [PATCH 02/31] Preparing release 1.2.1 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 736326c..9ed6a04 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.1 (unreleased) +1.2.1 (2018-10-31) ------------------ - Add support for nanosecond precision timestamps diff --git a/setup.py b/setup.py index 45746bc..96b0ede 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.1.dev0', + version='1.2.1', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From be26ddee3a020122ebcefbb6866463210d71ce39 Mon Sep 17 00:00:00 2001 From: vangheem Date: Wed, 31 Oct 2018 07:05:04 -0400 Subject: [PATCH 03/31] Back to development: 1.2.2 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9ed6a04..5978138 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.2 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.1 (2018-10-31) ------------------ diff --git a/setup.py b/setup.py index 96b0ede..d044d63 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.1', + version='1.2.2.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From f7310ef7e69af66a23abba1e9e3f45a93c0a21dd Mon Sep 17 00:00:00 2001 From: David Ferlier Date: Thu, 8 Nov 2018 17:40:59 +0100 Subject: [PATCH 04/31] nanosec_precision by default, revbump to 1.2.2 --- CHANGELOG.rst | 4 ++-- aiofluent/event.py | 4 ++-- aiofluent/handler.py | 4 ++-- aiofluent/sender.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5978138..ccb308f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,8 @@ 1.2.2 (unreleased) ------------------ -- Nothing changed yet. - +- nanosecond_precision by default + [davidonna] 1.2.1 (2018-10-31) ------------------ diff --git a/aiofluent/event.py b/aiofluent/event.py index 6ac9084..988452b 100644 --- a/aiofluent/event.py +++ b/aiofluent/event.py @@ -11,7 +11,7 @@ def __init__(self, label, data, **kwargs): self.label = label self.data = data self.sender_ = kwargs.get('sender', sender.get_global_sender()) - self.timestamp = kwargs.get('time', int(time.time())) + self.timestamp = kwargs.get('time', time.time()) async def __call__(self): await self.sender_.async_emit_with_time( @@ -21,5 +21,5 @@ async def __call__(self): async def send_event(label, data, **kwargs): assert isinstance(data, dict), 'data must be a dict' sender_ = kwargs.get('sender', sender.get_global_sender()) - timestamp = kwargs.get('time', int(time.time())) + timestamp = kwargs.get('time', time.time()) await sender_.async_emit_with_time(label, timestamp, data) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index b8f94b0..5a19645 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -91,7 +91,7 @@ def __init__(self): async def consume_queue(self, initial_record, handler): self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) - self._queue.put_nowait((initial_record, handler, int(time.time()))) + self._queue.put_nowait((initial_record, handler, time.time())) while True: record, handler, timestamp = await self._queue.get() try: @@ -152,7 +152,7 @@ def emit(self, record): 'No event loop running to send log to fluentd') else: try: - FluentHandler._queue.put_nowait((record, self, int(time.time()))) + FluentHandler._queue.put_nowait((record, self, time.time())) except asyncio.QueueFull: sys.stderr.write( f'Hit max log queue size({MAX_QUEUE_SIZE}), ' diff --git a/aiofluent/sender.py b/aiofluent/sender.py index 52abd01..46cd8de 100644 --- a/aiofluent/sender.py +++ b/aiofluent/sender.py @@ -65,7 +65,7 @@ def __init__(self, buffer_overflow_handler=None, retry_timeout=30, connection_factory=connection_factory, - nanosecond_precision=False, + nanosecond_precision=True, **kwargs): self._tag = tag From d026f8fbe598ba0990b8facbd96c9215c9543ded Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:30:46 -0400 Subject: [PATCH 05/31] Preparing release 1.2.2 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ccb308f..358950e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.2 (unreleased) +1.2.2 (2019-04-01) ------------------ - nanosecond_precision by default diff --git a/setup.py b/setup.py index d044d63..8d45637 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.2.dev0', + version='1.2.2', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From f6a5148ca6260dc4d95ff4e7ce06e42e167b4823 Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:30:53 -0400 Subject: [PATCH 06/31] Back to development: 1.2.3 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 358950e..0384581 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.3 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.2 (2019-04-01) ------------------ diff --git a/setup.py b/setup.py index 8d45637..2099218 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.2', + version='1.2.3.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 340d45caa4a76bce4fc2aa68879c26023d96f3eb Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:34:58 -0400 Subject: [PATCH 07/31] tweak test req --- setup.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 2099218..5fee771 100755 --- a/setup.py +++ b/setup.py @@ -31,11 +31,10 @@ ], extras_require={ 'test': [ - 'pytest<=3.1.0', - 'pytest-asyncio>=0.8.0', - 'pytest-aiohttp', + 'pytest>=3.8.0', + 'pytest-asyncio>=0.10.0', 'pytest-cov', - 'coverage' + 'coverage>=4.0.3', ] }, test_suite='tests' From 22dfea073d455f05fa2333e37918ed4e6adaa83f Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:38:02 -0400 Subject: [PATCH 08/31] bump --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0384581..8770856 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ 1.2.3 (unreleased) ------------------ -- Nothing changed yet. +- Fix release 1.2.2 (2019-04-01) From 8d5318f3db82865d938ba8b36606e26e85391f4c Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:38:07 -0400 Subject: [PATCH 09/31] Preparing release 1.2.3 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8770856..e72b4c6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.3 (unreleased) +1.2.3 (2019-04-01) ------------------ - Fix release diff --git a/setup.py b/setup.py index 5fee771..9d5da6c 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.3.dev0', + version='1.2.3', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 8f6303cd636723400149df69a365d9944185252a Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 1 Apr 2019 10:38:19 -0400 Subject: [PATCH 10/31] Back to development: 1.2.4 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e72b4c6..692909c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.4 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.3 (2019-04-01) ------------------ diff --git a/setup.py b/setup.py index 9d5da6c..605c53b 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.3', + version='1.2.4.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 922e214bbe010424cb734322992e918cc0f0536b Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 14:15:15 -0500 Subject: [PATCH 11/31] increase max queue size --- CHANGELOG.rst | 2 +- aiofluent/handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 692909c..98bf479 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ 1.2.4 (unreleased) ------------------ -- Nothing changed yet. +- Increase max queue size 1.2.3 (2019-04-01) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index 5a19645..4e65574 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -81,7 +81,7 @@ def _add_dic(data, dic): data[str(key)] = value -MAX_QUEUE_SIZE = 100 +MAX_QUEUE_SIZE = 500 class LogQueue: From 0bc5b9a62f03f50d37a3accc4ed7d18a69323fe8 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 14:15:32 -0500 Subject: [PATCH 12/31] better msg --- aiofluent/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index 4e65574..c0bc779 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -155,7 +155,7 @@ def emit(self, record): FluentHandler._queue.put_nowait((record, self, time.time())) except asyncio.QueueFull: sys.stderr.write( - f'Hit max log queue size({MAX_QUEUE_SIZE}), ' + f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' 'discarding message') except AttributeError: sys.stderr.write('Error sending async fluentd message') From 5ca79a875872a3186e7cfbfa66f20579d4ba8ae2 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 14:15:53 -0500 Subject: [PATCH 13/31] Preparing release 1.2.4 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 98bf479..1f68619 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.4 (unreleased) +1.2.4 (2019-12-19) ------------------ - Increase max queue size diff --git a/setup.py b/setup.py index 605c53b..37e556e 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.4.dev0', + version='1.2.4', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From f8e81016e7fa837cb1601c6f8867d26bb8736bab Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 14:16:47 -0500 Subject: [PATCH 14/31] Back to development: 1.2.5 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1f68619..013dc98 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.5 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.4 (2019-12-19) ------------------ diff --git a/setup.py b/setup.py index 37e556e..f0f6dba 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.4', + version='1.2.5.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From ebc30075a67f93bed71a01013119512a2735e63f Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 20:34:30 -0500 Subject: [PATCH 15/31] Handle event loop closed error --- CHANGELOG.rst | 3 ++- aiofluent/handler.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 013dc98..6b566f8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,8 @@ 1.2.5 (unreleased) ------------------ -- Nothing changed yet. +- Handle event loop closed error + [vangheem] 1.2.4 (2019-12-19) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index c0bc779..b684915 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -153,6 +153,8 @@ def emit(self, record): else: try: FluentHandler._queue.put_nowait((record, self, time.time())) + except RuntimeError: + sys.stderr.write("RuntimeError, likely event loop closing") except asyncio.QueueFull: sys.stderr.write( f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' From d1eb3b85bbddf819f5fcb2ebc454a9008b69d3c1 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 20:34:38 -0500 Subject: [PATCH 16/31] Preparing release 1.2.5 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6b566f8..dad2937 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.5 (unreleased) +1.2.5 (2019-12-19) ------------------ - Handle event loop closed error diff --git a/setup.py b/setup.py index f0f6dba..2830f20 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.5.dev0', + version='1.2.5', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From f1e0e5724ab89ea2538f5367b752e2de2de8a8ed Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 19 Dec 2019 20:34:55 -0500 Subject: [PATCH 17/31] Back to development: 1.2.6 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index dad2937..1cc7259 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.6 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.5 (2019-12-19) ------------------ diff --git a/setup.py b/setup.py index 2830f20..3905004 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.5', + version='1.2.6.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From cd28780e50b58508aa057730b78bc822873b032d Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 6 Jan 2020 10:20:36 -0500 Subject: [PATCH 18/31] Improve error logging --- CHANGELOG.rst | 4 ++-- aiofluent/handler.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1cc7259..2c538d6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,8 @@ 1.2.6 (unreleased) ------------------ -- Nothing changed yet. - +- Improve error logging + [vangheem] 1.2.5 (2019-12-19) ------------------ diff --git a/aiofluent/handler.py b/aiofluent/handler.py index b684915..d60f901 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -8,6 +8,7 @@ import socket import sys import time +import traceback class FluentRecordFormatter(logging.Formatter, object): @@ -98,7 +99,9 @@ async def consume_queue(self, initial_record, handler): await handler.async_emit(record, timestamp) except: # noqa sys.stderr.write( - f'Error processing log') + 'Error processing log\n{}\n'.format( + traceback.format_exc() + )) finally: self._queue.task_done() @@ -149,18 +152,18 @@ def emit(self, record): FluentHandler._queue.consume_queue(record, self), loop=self.loop) except RuntimeError: sys.stderr.write( - 'No event loop running to send log to fluentd') + 'No event loop running to send log to fluentd\n') else: try: FluentHandler._queue.put_nowait((record, self, time.time())) except RuntimeError: - sys.stderr.write("RuntimeError, likely event loop closing") + sys.stderr.write("RuntimeError, likely event loop closing\n") except asyncio.QueueFull: sys.stderr.write( f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' - 'discarding message') + 'discarding message\n') except AttributeError: - sys.stderr.write('Error sending async fluentd message') + sys.stderr.write('Error sending async fluentd message\n') async def async_emit(self, record, timestamp=None): data = self.format(record) From 8f5cbe77867f93927edf1d3da41317552a94a16b Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 6 Jan 2020 10:20:42 -0500 Subject: [PATCH 19/31] Preparing release 1.2.6 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2c538d6..81c6547 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.6 (unreleased) +1.2.6 (2020-01-06) ------------------ - Improve error logging diff --git a/setup.py b/setup.py index 3905004..cbe82d3 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.6.dev0', + version='1.2.6', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 0f92cd0000622fcaec598120178eb70b656e868e Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 6 Jan 2020 10:20:50 -0500 Subject: [PATCH 20/31] Back to development: 1.2.7 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 81c6547..a199203 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.7 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.6 (2020-01-06) ------------------ diff --git a/setup.py b/setup.py index cbe82d3..7b17860 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.6', + version='1.2.7.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 0a41ba7961749c96867523caad2d69d1d07fab2a Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 9 Mar 2020 08:37:57 -0400 Subject: [PATCH 21/31] fix location to aiofluent --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7b17860..b7a6f2b 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ install_requires=['msgpack-python'], author='Nathan Van Gheem', author_email='vangheem@gmail.com', - url='https://github.com/onna/aiofluent', + url='https://github.com/guillotinaweb/aiofluent', download_url='http://pypi.python.org/pypi/aiofluent/', license='Apache License, Version 2.0', classifiers=[ From d3c56c4f5cee0199395b517f58a4498c006b959d Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 9 Mar 2020 08:38:11 -0400 Subject: [PATCH 22/31] cl --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a199203..bcfaf5f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ 1.2.7 (unreleased) ------------------ -- Nothing changed yet. +- Fix repo location 1.2.6 (2020-01-06) From 867ffcf2c85ee5fc44c70f0b0c5ab8fdede24601 Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 9 Mar 2020 08:38:15 -0400 Subject: [PATCH 23/31] Preparing release 1.2.7 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bcfaf5f..b76d19d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.7 (unreleased) +1.2.7 (2020-03-09) ------------------ - Fix repo location diff --git a/setup.py b/setup.py index b7a6f2b..e666e4b 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.7.dev0', + version='1.2.7', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 207823666a3ce31fa65848767350604b62f08e01 Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 9 Mar 2020 08:38:23 -0400 Subject: [PATCH 24/31] Back to development: 1.2.8 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b76d19d..c12bf61 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.8 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.7 (2020-03-09) ------------------ diff --git a/setup.py b/setup.py index e666e4b..8e1d0c3 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.7', + version='1.2.8.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From e69ca178a64729a1237e178c84cb6256c984925d Mon Sep 17 00:00:00 2001 From: vangheem Date: Fri, 15 May 2020 14:46:43 -0400 Subject: [PATCH 25/31] handle TypeError --- CHANGELOG.rst | 2 +- aiofluent/handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c12bf61..64d26a1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ 1.2.8 (unreleased) ------------------ -- Nothing changed yet. +- Handle TypeError formatting log data 1.2.7 (2020-03-09) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index d60f901..9885352 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -42,7 +42,7 @@ def format(self, record): for key, value in self._fmt_dict.items(): try: data[key] = value % record.__dict__ - except KeyError: + except (KeyError, TypeError): # we are okay with missing values here... pass From f62342047c1250e19788136d4dbac65d33933e6d Mon Sep 17 00:00:00 2001 From: vangheem Date: Fri, 15 May 2020 14:46:55 -0400 Subject: [PATCH 26/31] Preparing release 1.2.8 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 64d26a1..a6c1466 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.8 (unreleased) +1.2.8 (2020-05-15) ------------------ - Handle TypeError formatting log data diff --git a/setup.py b/setup.py index 8e1d0c3..36eee00 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.8.dev0', + version='1.2.8', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From 93afff33132dd200bc661d55202dd288beb4cf60 Mon Sep 17 00:00:00 2001 From: vangheem Date: Fri, 15 May 2020 14:47:03 -0400 Subject: [PATCH 27/31] Back to development: 1.2.9 --- CHANGELOG.rst | 6 ++++++ setup.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a6c1466..eb7c6a4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +1.2.9 (unreleased) +------------------ + +- Nothing changed yet. + + 1.2.8 (2020-05-15) ------------------ diff --git a/setup.py b/setup.py index 36eee00..d530550 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.8', + version='1.2.9.dev0', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From f1147e71592f827d75aedcfa778856da9b9fbcd3 Mon Sep 17 00:00:00 2001 From: Nathan Van Gheem Date: Thu, 22 Oct 2020 15:19:44 -0400 Subject: [PATCH 28/31] Only log errors every 30 seconds (#5) --- CHANGELOG.rst | 2 +- aiofluent/handler.py | 16 +++++--- aiofluent/sender.py | 4 +- tests/fixtures.py | 2 +- tests/test_handler.py | 85 ++++++++++++++++++++++++++++++++----------- tests/test_sender.py | 1 - 6 files changed, 78 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index eb7c6a4..da3cbb3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ 1.2.9 (unreleased) ------------------ -- Nothing changed yet. +- Only log errors every 30 seconds 1.2.8 (2020-05-15) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index 9885352..870f491 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -87,11 +87,12 @@ def _add_dic(data, dic): class LogQueue: - def __init__(self): - self._queue = None + def __init__(self, queue=None): + self._queue = queue async def consume_queue(self, initial_record, handler): - self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) + if self._queue is None: + self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) self._queue.put_nowait((initial_record, handler, time.time())) while True: record, handler, timestamp = await self._queue.get() @@ -140,6 +141,7 @@ def __init__(self, timeout=timeout, verbose=verbose, nanosecond_precision=self.nanosecond_precision, **kwargs) + self.last_warning_sent = 0 logging.Handler.__init__(self) def emit(self, record): @@ -159,9 +161,11 @@ def emit(self, record): except RuntimeError: sys.stderr.write("RuntimeError, likely event loop closing\n") except asyncio.QueueFull: - sys.stderr.write( - f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' - 'discarding message\n') + if time.time() - self.last_warning_sent > 30: + sys.stderr.write( + f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' + 'discarding message\n') + self.last_warning_sent = time.time() except AttributeError: sys.stderr.write('Error sending async fluentd message\n') diff --git a/aiofluent/sender.py b/aiofluent/sender.py index 46cd8de..d74fd7e 100644 --- a/aiofluent/sender.py +++ b/aiofluent/sender.py @@ -36,10 +36,10 @@ async def connection_factory(sender): asyncio.open_connection(sender._host, sender._port), sender._timeout) except (asyncio.TimeoutError, asyncio.CancelledError) as ex: - sys.stderr.write(f'Timeout connecting to fluentd') + sys.stderr.write('Timeout connecting to fluentd') sender.last_error = ex except Exception as ex: - sys.stderr.write(f'Unknown error connecting to fluentd') + sys.stderr.write('Unknown error connecting to fluentd') sender.last_error = ex diff --git a/tests/fixtures.py b/tests/fixtures.py index 2232f8f..aec5363 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="function") -async def mock_server(loop): +async def mock_server(): server = mockserver.MockRecvServer() yield server diff --git a/tests/test_handler.py b/tests/test_handler.py index d1fa534..474969b 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -1,21 +1,23 @@ import aiofluent.handler +from unittest.mock import patch import asyncio import logging import pytest -async def wait_for_queue(handler, loop): +async def wait_for_queue(handler): while handler._queue is None or handler._queue._queue is None: - await asyncio.sleep(0.01, loop=loop) + await asyncio.sleep(0.01) while handler._queue.qsize() > 0: - await asyncio.sleep(0.01, loop=loop) + await asyncio.sleep(0.01) @pytest.mark.asyncio -async def test_simple(mock_server, event_loop): +async def test_simple(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) @@ -23,7 +25,7 @@ async def test_simple(mock_server, event_loop): 'from': 'userA', 'to': 'userB' }) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() assert 1 == len(data) @@ -32,16 +34,17 @@ async def test_simple(mock_server, event_loop): assert 'userA' == data[0][2]['from'] assert 'userB' == data[0][2]['to'] assert data[0][1] - assert isinstance(data[0][1], int) + assert isinstance(data[0][1], float) @pytest.mark.asyncio -async def test_custom_fmt(mock_server, event_loop): +async def test_custom_fmt(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter( aiofluent.handler.FluentRecordFormatter(fmt={ 'name': '%(name)s', @@ -52,7 +55,7 @@ async def test_custom_fmt(mock_server, event_loop): log.handlers = [] log.addHandler(handler) log.info({'sample': 'value'}) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -63,17 +66,18 @@ async def test_custom_fmt(mock_server, event_loop): @pytest.mark.asyncio -async def test_json_encoded_message(mock_server, event_loop): +async def test_json_encoded_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -82,17 +86,18 @@ async def test_json_encoded_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_unstructured_message(mock_server, event_loop): +async def test_unstructured_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('hello %s', 'world') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -101,17 +106,18 @@ async def test_unstructured_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_unstructured_formatted_message(mock_server, event_loop): +async def test_unstructured_formatted_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('hello world, %s', 'you!') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -120,17 +126,18 @@ async def test_unstructured_formatted_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_non_string_simple_message(mock_server, event_loop): +async def test_non_string_simple_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info(42) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -138,17 +145,18 @@ async def test_non_string_simple_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_non_string_dict_message(mock_server, event_loop): +async def test_non_string_dict_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info({42: 'root'}) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -166,12 +174,13 @@ def cancel(self): @pytest.mark.asyncio -async def test_discard_message_over_limit(mock_server, event_loop): +async def test_discard_message_over_limit(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter( aiofluent.handler.FluentRecordFormatter(fmt={ 'name': '%(name)s', @@ -180,8 +189,8 @@ async def test_discard_message_over_limit(mock_server, event_loop): }) ) aiofluent.handler.FluentHandler._queue_task = MockQueueTask() - aiofluent.handler.FluentHandler._queue._queue = asyncio.Queue( - maxsize=aiofluent.handler.MAX_QUEUE_SIZE) + aiofluent.handler.FluentHandler._queue = aiofluent.handler.LogQueue( + asyncio.Queue(maxsize=aiofluent.handler.MAX_QUEUE_SIZE)) log.handlers = [] log.addHandler(handler) for _ in range(aiofluent.handler.MAX_QUEUE_SIZE): @@ -193,3 +202,37 @@ async def test_discard_message_over_limit(mock_server, event_loop): # does not add, same queue size... assert handler._queue.qsize() == qsize handler.close() + + +@pytest.mark.asyncio +async def test_discard_message_over_limit_does_not_space_stderr(mock_server): + handler = aiofluent.handler.FluentHandler( + 'app.follow', connection_factory=mock_server.factory) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) + handler.setFormatter( + aiofluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'lineno': '%(lineno)d', + 'emitted_at': '%(asctime)s', + }) + ) + aiofluent.handler.FluentHandler._queue_task = MockQueueTask() + aiofluent.handler.FluentHandler._queue = aiofluent.handler.LogQueue( + asyncio.Queue(maxsize=1)) + log.handlers = [] + log.addHandler(handler) + log.info({'sample': 'value'}) + + with patch('aiofluent.handler.sys.stderr.write') as stderr: + log.info({'sample': 'value'}) + stderr.assert_called() + + with patch('aiofluent.handler.sys.stderr.write') as stderr: + # should not log this time + log.info({'sample': 'value'}) + stderr.assert_not_called() + + handler.close() diff --git a/tests/test_sender.py b/tests/test_sender.py index a8c1a7a..6ebfa80 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -49,7 +49,6 @@ async def test_simple(mock_sender, mock_server): assert 'test.foo' == data[0][0] assert {'bar': 'baz'} == data[0][2] assert data[0][1] - assert isinstance(data[0][1], int) @pytest.mark.asyncio From f0c0e39278d9500e187337cfdb2c2894ca619d06 Mon Sep 17 00:00:00 2001 From: vangheem Date: Thu, 22 Oct 2020 15:20:03 -0400 Subject: [PATCH 29/31] Preparing release 1.2.9 --- CHANGELOG.rst | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index da3cbb3..f9f9de0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,4 +1,4 @@ -1.2.9 (unreleased) +1.2.9 (2020-10-22) ------------------ - Only log errors every 30 seconds diff --git a/setup.py b/setup.py index d530550..06d88b6 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.9.dev0', + version='1.2.9', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, From ff6c69d66e3b44826acfec75b17b955ea2a62e50 Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 13 Jun 2022 13:26:06 -0500 Subject: [PATCH 30/31] Updating logging to add newline and exception details --- aiofluent/sender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiofluent/sender.py b/aiofluent/sender.py index d74fd7e..92743b1 100644 --- a/aiofluent/sender.py +++ b/aiofluent/sender.py @@ -175,7 +175,7 @@ async def _async_send_internal(self, bytes_): return False except Exception as ex: self.last_error = ex - sys.stderr.write('Unhandled exception sending data') + sys.stderr.write(f'Unhandled exception sending data:\n{ex}\n') self.clean(bytes_) return False From 20e7530c0ec0948618ad373b9769a6f834f248e7 Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 13 Jun 2022 13:26:28 -0500 Subject: [PATCH 31/31] Adding dev tag to version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 06d88b6..22f83cd 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.9', + version='1.2.9-dev', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'},