Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c61745e
Backport nanosecond_precision support from fluent/fluent-logger-pytho…
davidonna Oct 31, 2018
0ac2f71
Preparing release 1.2.1
vangheem Oct 31, 2018
be26dde
Back to development: 1.2.2
vangheem Oct 31, 2018
f7310ef
nanosec_precision by default, revbump to 1.2.2
Nov 8, 2018
d026f8f
Preparing release 1.2.2
vangheem Apr 1, 2019
f6a5148
Back to development: 1.2.3
vangheem Apr 1, 2019
340d45c
tweak test req
vangheem Apr 1, 2019
22dfea0
bump
vangheem Apr 1, 2019
8d5318f
Preparing release 1.2.3
vangheem Apr 1, 2019
8f6303c
Back to development: 1.2.4
vangheem Apr 1, 2019
922e214
increase max queue size
vangheem Dec 19, 2019
0bc5b9a
better msg
vangheem Dec 19, 2019
5ca79a8
Preparing release 1.2.4
vangheem Dec 19, 2019
f8e8101
Back to development: 1.2.5
vangheem Dec 19, 2019
ebc3007
Handle event loop closed error
vangheem Dec 20, 2019
d1eb3b8
Preparing release 1.2.5
vangheem Dec 20, 2019
f1e0e57
Back to development: 1.2.6
vangheem Dec 20, 2019
cd28780
Improve error logging
vangheem Jan 6, 2020
8f5cbe7
Preparing release 1.2.6
vangheem Jan 6, 2020
0f92cd0
Back to development: 1.2.7
vangheem Jan 6, 2020
0a41ba7
fix location to aiofluent
vangheem Mar 9, 2020
d3c56c4
cl
vangheem Mar 9, 2020
867ffcf
Preparing release 1.2.7
vangheem Mar 9, 2020
2078236
Back to development: 1.2.8
vangheem Mar 9, 2020
e69ca17
handle TypeError
vangheem May 15, 2020
f623420
Preparing release 1.2.8
vangheem May 15, 2020
93afff3
Back to development: 1.2.9
vangheem May 15, 2020
f1147e7
Only log errors every 30 seconds (#5)
vangheem Oct 22, 2020
f0c0e39
Preparing release 1.2.9
vangheem Oct 22, 2020
ff6c69d
Updating logging to add newline and exception details
kyle-widmann Jun 13, 2022
20e7530
Adding dev tag to version
kyle-widmann Jun 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,58 @@
1.2.1 (unreleased)
1.2.9 (2020-10-22)
------------------

- Nothing changed yet.
- Only log errors every 30 seconds


1.2.8 (2020-05-15)
------------------

- Handle TypeError formatting log data


1.2.7 (2020-03-09)
------------------

- Fix repo location


1.2.6 (2020-01-06)
------------------

- Improve error logging
[vangheem]

1.2.5 (2019-12-19)
------------------

- Handle event loop closed error
[vangheem]


1.2.4 (2019-12-19)
------------------

- Increase max queue size


1.2.3 (2019-04-01)
------------------

- Fix release


1.2.2 (2019-04-01)
------------------

- nanosecond_precision by default
[davidonna]

1.2.1 (2018-10-31)
------------------

- Add support for nanosecond precision timestamps
[davidonna]

1.2.0 (2018-06-14)
------------------

Expand Down
4 changes: 2 additions & 2 deletions aiofluent/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, label, data, **kwargs):
self.label = label
self.data = data
self.sender_ = kwargs.get('sender', sender.get_global_sender())
self.timestamp = kwargs.get('time', int(time.time()))
self.timestamp = kwargs.get('time', time.time())

async def __call__(self):
await self.sender_.async_emit_with_time(
Expand All @@ -21,5 +21,5 @@ async def __call__(self):
async def send_event(label, data, **kwargs):
assert isinstance(data, dict), 'data must be a dict'
sender_ = kwargs.get('sender', sender.get_global_sender())
timestamp = kwargs.get('time', int(time.time()))
timestamp = kwargs.get('time', time.time())
await sender_.async_emit_with_time(label, timestamp, data)
38 changes: 25 additions & 13 deletions aiofluent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import sys
import time
import traceback


class FluentRecordFormatter(logging.Formatter, object):
Expand Down Expand Up @@ -41,7 +42,7 @@ def format(self, record):
for key, value in self._fmt_dict.items():
try:
data[key] = value % record.__dict__
except KeyError:
except (KeyError, TypeError):
# we are okay with missing values here...
pass

Expand Down Expand Up @@ -81,24 +82,27 @@ def _add_dic(data, dic):
data[str(key)] = value


MAX_QUEUE_SIZE = 100
MAX_QUEUE_SIZE = 500


class LogQueue:

def __init__(self):
self._queue = None
def __init__(self, queue=None):
self._queue = queue

async def consume_queue(self, initial_record, handler):
self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
self._queue.put_nowait((initial_record, handler, int(time.time())))
if self._queue is None:
self._queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
self._queue.put_nowait((initial_record, handler, time.time()))
while True:
record, handler, timestamp = await self._queue.get()
try:
await handler.async_emit(record, timestamp)
except: # noqa
sys.stderr.write(
f'Error processing log')
'Error processing log\n{}\n'.format(
traceback.format_exc()
))
finally:
self._queue.task_done()

Expand Down Expand Up @@ -127,13 +131,17 @@ def __init__(self,
timeout=3,
verbose=False,
loop=None,
nanosecond_precision=False,
**kwargs):
self.loop = loop
self.tag = tag
self.nanosecond_precision = nanosecond_precision
self.sender = sender.FluentSender(tag,
host=host, port=port,
timeout=timeout, verbose=verbose,
nanosecond_precision=self.nanosecond_precision,
**kwargs)
self.last_warning_sent = 0
logging.Handler.__init__(self)

def emit(self, record):
Expand All @@ -146,16 +154,20 @@ def emit(self, record):
FluentHandler._queue.consume_queue(record, self), loop=self.loop)
except RuntimeError:
sys.stderr.write(
'No event loop running to send log to fluentd')
'No event loop running to send log to fluentd\n')
else:
try:
FluentHandler._queue.put_nowait((record, self, int(time.time())))
FluentHandler._queue.put_nowait((record, self, time.time()))
except RuntimeError:
sys.stderr.write("RuntimeError, likely event loop closing\n")
except asyncio.QueueFull:
sys.stderr.write(
f'Hit max log queue size({MAX_QUEUE_SIZE}), '
'discarding message')
if time.time() - self.last_warning_sent > 30:
sys.stderr.write(
f'Fluentd hit max log queue size({MAX_QUEUE_SIZE}), '
'discarding message\n')
self.last_warning_sent = time.time()
except AttributeError:
sys.stderr.write('Error sending async fluentd message')
sys.stderr.write('Error sending async fluentd message\n')

async def async_emit(self, record, timestamp=None):
data = self.format(record)
Expand Down
34 changes: 27 additions & 7 deletions aiofluent/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import time
import traceback
import struct

_global_sender = None

Expand Down Expand Up @@ -35,13 +36,24 @@ async def connection_factory(sender):
asyncio.open_connection(sender._host, sender._port),
sender._timeout)
except (asyncio.TimeoutError, asyncio.CancelledError) as ex:
sys.stderr.write(f'Timeout connecting to fluentd')
sys.stderr.write('Timeout connecting to fluentd')
sender.last_error = ex
except Exception as ex:
sys.stderr.write(f'Unknown error connecting to fluentd')
sys.stderr.write('Unknown error connecting to fluentd')
sender.last_error = ex


class EventTime(msgpack.ExtType):
def __new__(cls, timestamp):
seconds = int(timestamp)
nanoseconds = int(timestamp % 1 * 10 ** 9)
return super(EventTime, cls).__new__(
cls,
code=0,
data=struct.pack(">II", seconds, nanoseconds),
)


class FluentSender(object):
def __init__(self,
tag,
Expand All @@ -53,6 +65,7 @@ def __init__(self,
buffer_overflow_handler=None,
retry_timeout=30,
connection_factory=connection_factory,
nanosecond_precision=True,
**kwargs):

self._tag = tag
Expand All @@ -62,6 +75,7 @@ def __init__(self,
self._timeout = timeout
self._verbose = verbose
self._buffer_overflow_handler = buffer_overflow_handler
self._nanosecond_precision = nanosecond_precision

self._pendings = None
self._reader = None
Expand Down Expand Up @@ -94,11 +108,17 @@ async def get_writer(self):
return self._writer

async def async_emit(self, label, data, timestamp=None):
if timestamp is None:
timestamp = int(time.time())
return await self.async_emit_with_time(label, timestamp, data)
if timestamp is not None:
ev_time = timestamp
elif self._nanosecond_precision:
ev_time = EventTime(time.time())
else:
ev_time = int(time.time())
return await self.async_emit_with_time(label, ev_time, data)

async def async_emit_with_time(self, label, timestamp, data):
if self._nanosecond_precision and isinstance(timestamp, float):
timestamp = EventTime(timestamp)
try:
bytes_ = self._make_packet(label, timestamp, data)
except Exception as e:
Expand Down Expand Up @@ -155,7 +175,7 @@ async def _async_send_internal(self, bytes_):
return False
except Exception as ex:
self.last_error = ex
sys.stderr.write('Unhandled exception sending data')
sys.stderr.write(f'Unhandled exception sending data:\n{ex}\n')
self.clean(bytes_)
return False

Expand All @@ -170,7 +190,7 @@ def _call_buffer_overflow_handler(self, pending_events):
try:
if self._buffer_overflow_handler:
self._buffer_overflow_handler(pending_events)
except Exception as e:
except Exception:
# User should care any exception in handler
pass

Expand Down
11 changes: 5 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@

setup(
name='aiofluent',
version='1.2.1.dev0',
version='1.2.9-dev',
description=desc,
long_description=open(README).read() + '\n\n' + open(CHANGELOG).read(),
package_dir={'aiofluent': 'aiofluent'},
packages=['aiofluent'],
install_requires=['msgpack-python'],
author='Nathan Van Gheem',
author_email='vangheem@gmail.com',
url='https://github.com/onna/aiofluent',
url='https://github.com/guillotinaweb/aiofluent',
download_url='http://pypi.python.org/pypi/aiofluent/',
license='Apache License, Version 2.0',
classifiers=[
Expand All @@ -31,11 +31,10 @@
],
extras_require={
'test': [
'pytest<=3.1.0',
'pytest-asyncio>=0.8.0',
'pytest-aiohttp',
'pytest>=3.8.0',
'pytest-asyncio>=0.10.0',
'pytest-cov',
'coverage==4.0.3'
'coverage>=4.0.3',
]
},
test_suite='tests'
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


@pytest.fixture(scope="function")
async def mock_server(loop):
async def mock_server():
server = mockserver.MockRecvServer()
yield server

Expand Down
Loading