diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c8777a0..f9f9de0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,9 +1,58 @@ -1.2.1 (unreleased) +1.2.9 (2020-10-22) ------------------ -- Nothing changed yet. +- Only log errors every 30 seconds +1.2.8 (2020-05-15) +------------------ + +- Handle TypeError formatting log data + + +1.2.7 (2020-03-09) +------------------ + +- Fix repo location + + +1.2.6 (2020-01-06) +------------------ + +- Improve error logging + [vangheem] + +1.2.5 (2019-12-19) +------------------ + +- Handle event loop closed error + [vangheem] + + +1.2.4 (2019-12-19) +------------------ + +- Increase max queue size + + +1.2.3 (2019-04-01) +------------------ + +- Fix release + + +1.2.2 (2019-04-01) +------------------ + +- nanosecond_precision by default + [davidonna] + +1.2.1 (2018-10-31) +------------------ + +- Add support for nanosecond precision timestamps + [davidonna] + 1.2.0 (2018-06-14) ------------------ diff --git a/aiofluent/event.py b/aiofluent/event.py index 6ac9084..988452b 100644 --- a/aiofluent/event.py +++ b/aiofluent/event.py @@ -11,7 +11,7 @@ def __init__(self, label, data, **kwargs): self.label = label self.data = data self.sender_ = kwargs.get('sender', sender.get_global_sender()) - self.timestamp = kwargs.get('time', int(time.time())) + self.timestamp = kwargs.get('time', time.time()) async def __call__(self): await self.sender_.async_emit_with_time( @@ -21,5 +21,5 @@ async def __call__(self): async def send_event(label, data, **kwargs): assert isinstance(data, dict), 'data must be a dict' sender_ = kwargs.get('sender', sender.get_global_sender()) - timestamp = kwargs.get('time', int(time.time())) + timestamp = kwargs.get('time', time.time()) await sender_.async_emit_with_time(label, timestamp, data) diff --git a/aiofluent/handler.py b/aiofluent/handler.py index e47233c..870f491 100644 --- a/aiofluent/handler.py +++ b/aiofluent/handler.py @@ -8,6 +8,7 @@ import socket import sys import time +import traceback class FluentRecordFormatter(logging.Formatter, object): @@ -41,7 +42,7 @@ def format(self, record): for key, value in self._fmt_dict.items(): try: data[key] = value % record.__dict__ - except KeyError: + except (KeyError, TypeError): # we are okay with missing values here... pass @@ -81,24 +82,27 @@ def _add_dic(data, dic): data[str(key)] = value -MAX_QUEUE_SIZE = 100 +MAX_QUEUE_SIZE = 500 class LogQueue: - def __init__(self): - self._queue = None + def __init__(self, queue=None): + self._queue = queue async def consume_queue(self, initial_record, handler): - self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) - self._queue.put_nowait((initial_record, handler, int(time.time()))) + if self._queue is None: + self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) + self._queue.put_nowait((initial_record, handler, time.time())) while True: record, handler, timestamp = await self._queue.get() try: await handler.async_emit(record, timestamp) except: # noqa sys.stderr.write( - f'Error processing log') + 'Error processing log\n{}\n'.format( + traceback.format_exc() + )) finally: self._queue.task_done() @@ -127,13 +131,17 @@ def __init__(self, timeout=3, verbose=False, loop=None, + nanosecond_precision=False, **kwargs): self.loop = loop self.tag = tag + self.nanosecond_precision = nanosecond_precision self.sender = sender.FluentSender(tag, host=host, port=port, timeout=timeout, verbose=verbose, + nanosecond_precision=self.nanosecond_precision, **kwargs) + self.last_warning_sent = 0 logging.Handler.__init__(self) def emit(self, record): @@ -146,16 +154,20 @@ def emit(self, record): FluentHandler._queue.consume_queue(record, self), loop=self.loop) except RuntimeError: sys.stderr.write( - 'No event loop running to send log to fluentd') + 'No event loop running to send log to fluentd\n') else: try: - FluentHandler._queue.put_nowait((record, self, int(time.time()))) + FluentHandler._queue.put_nowait((record, self, time.time())) + except RuntimeError: + sys.stderr.write("RuntimeError, likely event loop closing\n") except asyncio.QueueFull: - sys.stderr.write( - f'Hit max log queue size({MAX_QUEUE_SIZE}), ' - 'discarding message') + if time.time() - self.last_warning_sent > 30: + sys.stderr.write( + f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), ' + 'discarding message\n') + self.last_warning_sent = time.time() except AttributeError: - sys.stderr.write('Error sending async fluentd message') + sys.stderr.write('Error sending async fluentd message\n') async def async_emit(self, record, timestamp=None): data = self.format(record) diff --git a/aiofluent/sender.py b/aiofluent/sender.py index 5b2ac74..92743b1 100644 --- a/aiofluent/sender.py +++ b/aiofluent/sender.py @@ -5,6 +5,7 @@ import sys import time import traceback +import struct _global_sender = None @@ -35,13 +36,24 @@ async def connection_factory(sender): asyncio.open_connection(sender._host, sender._port), sender._timeout) except (asyncio.TimeoutError, asyncio.CancelledError) as ex: - sys.stderr.write(f'Timeout connecting to fluentd') + sys.stderr.write('Timeout connecting to fluentd') sender.last_error = ex except Exception as ex: - sys.stderr.write(f'Unknown error connecting to fluentd') + sys.stderr.write('Unknown error connecting to fluentd') sender.last_error = ex +class EventTime(msgpack.ExtType): + def __new__(cls, timestamp): + seconds = int(timestamp) + nanoseconds = int(timestamp % 1 * 10 ** 9) + return super(EventTime, cls).__new__( + cls, + code=0, + data=struct.pack(">II", seconds, nanoseconds), + ) + + class FluentSender(object): def __init__(self, tag, @@ -53,6 +65,7 @@ def __init__(self, buffer_overflow_handler=None, retry_timeout=30, connection_factory=connection_factory, + nanosecond_precision=True, **kwargs): self._tag = tag @@ -62,6 +75,7 @@ def __init__(self, self._timeout = timeout self._verbose = verbose self._buffer_overflow_handler = buffer_overflow_handler + self._nanosecond_precision = nanosecond_precision self._pendings = None self._reader = None @@ -94,11 +108,17 @@ async def get_writer(self): return self._writer async def async_emit(self, label, data, timestamp=None): - if timestamp is None: - timestamp = int(time.time()) - return await self.async_emit_with_time(label, timestamp, data) + if timestamp is not None: + ev_time = timestamp + elif self._nanosecond_precision: + ev_time = EventTime(time.time()) + else: + ev_time = int(time.time()) + return await self.async_emit_with_time(label, ev_time, data) async def async_emit_with_time(self, label, timestamp, data): + if self._nanosecond_precision and isinstance(timestamp, float): + timestamp = EventTime(timestamp) try: bytes_ = self._make_packet(label, timestamp, data) except Exception as e: @@ -155,7 +175,7 @@ async def _async_send_internal(self, bytes_): return False except Exception as ex: self.last_error = ex - sys.stderr.write('Unhandled exception sending data') + sys.stderr.write(f'Unhandled exception sending data:\n{ex}\n') self.clean(bytes_) return False @@ -170,7 +190,7 @@ def _call_buffer_overflow_handler(self, pending_events): try: if self._buffer_overflow_handler: self._buffer_overflow_handler(pending_events) - except Exception as e: + except Exception: # User should care any exception in handler pass diff --git a/setup.py b/setup.py index 6c1cfe7..22f83cd 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='aiofluent', - version='1.2.1.dev0', + version='1.2.9-dev', description=desc, long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(), package_dir={'aiofluent': 'aiofluent'}, @@ -21,7 +21,7 @@ install_requires=['msgpack-python'], author='Nathan Van Gheem', author_email='vangheem@gmail.com', - url='https://github.com/onna/aiofluent', + url='https://github.com/guillotinaweb/aiofluent', download_url='http://pypi.python.org/pypi/aiofluent/', license='Apache License, Version 2.0', classifiers=[ @@ -31,11 +31,10 @@ ], extras_require={ 'test': [ - 'pytest<=3.1.0', - 'pytest-asyncio>=0.8.0', - 'pytest-aiohttp', + 'pytest>=3.8.0', + 'pytest-asyncio>=0.10.0', 'pytest-cov', - 'coverage==4.0.3' + 'coverage>=4.0.3', ] }, test_suite='tests' diff --git a/tests/fixtures.py b/tests/fixtures.py index 2232f8f..aec5363 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="function") -async def mock_server(loop): +async def mock_server(): server = mockserver.MockRecvServer() yield server diff --git a/tests/test_handler.py b/tests/test_handler.py index d1fa534..474969b 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -1,21 +1,23 @@ import aiofluent.handler +from unittest.mock import patch import asyncio import logging import pytest -async def wait_for_queue(handler, loop): +async def wait_for_queue(handler): while handler._queue is None or handler._queue._queue is None: - await asyncio.sleep(0.01, loop=loop) + await asyncio.sleep(0.01) while handler._queue.qsize() > 0: - await asyncio.sleep(0.01, loop=loop) + await asyncio.sleep(0.01) @pytest.mark.asyncio -async def test_simple(mock_server, event_loop): +async def test_simple(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) @@ -23,7 +25,7 @@ async def test_simple(mock_server, event_loop): 'from': 'userA', 'to': 'userB' }) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() assert 1 == len(data) @@ -32,16 +34,17 @@ async def test_simple(mock_server, event_loop): assert 'userA' == data[0][2]['from'] assert 'userB' == data[0][2]['to'] assert data[0][1] - assert isinstance(data[0][1], int) + assert isinstance(data[0][1], float) @pytest.mark.asyncio -async def test_custom_fmt(mock_server, event_loop): +async def test_custom_fmt(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter( aiofluent.handler.FluentRecordFormatter(fmt={ 'name': '%(name)s', @@ -52,7 +55,7 @@ async def test_custom_fmt(mock_server, event_loop): log.handlers = [] log.addHandler(handler) log.info({'sample': 'value'}) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -63,17 +66,18 @@ async def test_custom_fmt(mock_server, event_loop): @pytest.mark.asyncio -async def test_json_encoded_message(mock_server, event_loop): +async def test_json_encoded_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('{"key": "hello world!", "param": "value"}') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -82,17 +86,18 @@ async def test_json_encoded_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_unstructured_message(mock_server, event_loop): +async def test_unstructured_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('hello %s', 'world') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -101,17 +106,18 @@ async def test_unstructured_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_unstructured_formatted_message(mock_server, event_loop): +async def test_unstructured_formatted_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info('hello world, %s', 'you!') - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -120,17 +126,18 @@ async def test_unstructured_formatted_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_non_string_simple_message(mock_server, event_loop): +async def test_non_string_simple_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info(42) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -138,17 +145,18 @@ async def test_non_string_simple_message(mock_server, event_loop): @pytest.mark.asyncio -async def test_non_string_dict_message(mock_server, event_loop): +async def test_non_string_dict_message(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter(aiofluent.handler.FluentRecordFormatter()) log.handlers = [] log.addHandler(handler) log.info({42: 'root'}) - await wait_for_queue(handler, event_loop) + await wait_for_queue(handler) handler.close() data = mock_server.get_recieved() @@ -166,12 +174,13 @@ def cancel(self): @pytest.mark.asyncio -async def test_discard_message_over_limit(mock_server, event_loop): +async def test_discard_message_over_limit(mock_server): handler = aiofluent.handler.FluentHandler( 'app.follow', connection_factory=mock_server.factory) logging.basicConfig(level=logging.INFO) log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) handler.setFormatter( aiofluent.handler.FluentRecordFormatter(fmt={ 'name': '%(name)s', @@ -180,8 +189,8 @@ async def test_discard_message_over_limit(mock_server, event_loop): }) ) aiofluent.handler.FluentHandler._queue_task = MockQueueTask() - aiofluent.handler.FluentHandler._queue._queue = asyncio.Queue( - maxsize=aiofluent.handler.MAX_QUEUE_SIZE) + aiofluent.handler.FluentHandler._queue = aiofluent.handler.LogQueue( + asyncio.Queue(maxsize=aiofluent.handler.MAX_QUEUE_SIZE)) log.handlers = [] log.addHandler(handler) for _ in range(aiofluent.handler.MAX_QUEUE_SIZE): @@ -193,3 +202,37 @@ async def test_discard_message_over_limit(mock_server, event_loop): # does not add, same queue size... assert handler._queue.qsize() == qsize handler.close() + + +@pytest.mark.asyncio +async def test_discard_message_over_limit_does_not_space_stderr(mock_server): + handler = aiofluent.handler.FluentHandler( + 'app.follow', connection_factory=mock_server.factory) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + log.setLevel(logging.INFO) + handler.setFormatter( + aiofluent.handler.FluentRecordFormatter(fmt={ + 'name': '%(name)s', + 'lineno': '%(lineno)d', + 'emitted_at': '%(asctime)s', + }) + ) + aiofluent.handler.FluentHandler._queue_task = MockQueueTask() + aiofluent.handler.FluentHandler._queue = aiofluent.handler.LogQueue( + asyncio.Queue(maxsize=1)) + log.handlers = [] + log.addHandler(handler) + log.info({'sample': 'value'}) + + with patch('aiofluent.handler.sys.stderr.write') as stderr: + log.info({'sample': 'value'}) + stderr.assert_called() + + with patch('aiofluent.handler.sys.stderr.write') as stderr: + # should not log this time + log.info({'sample': 'value'}) + stderr.assert_not_called() + + handler.close() diff --git a/tests/test_sender.py b/tests/test_sender.py index a8c1a7a..6ebfa80 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -49,7 +49,6 @@ async def test_simple(mock_sender, mock_server): assert 'test.foo' == data[0][0] assert {'bar': 'baz'} == data[0][2] assert data[0][1] - assert isinstance(data[0][1], int) @pytest.mark.asyncio