diff --git a/README.md b/README.md index 8641d526..ccd9463a 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ pip install singer-target-postgres ```bash ~/.virtualenvs/tap-something/bin/tap-something \ | ~/.virtualenvs/target-postgres/bin/target-postgres \ - --config ~/singer.io/target_postgres_config.json + --config ~/singer.io/target_postgres_config.json >> state.json ``` ### Config.json @@ -93,7 +93,6 @@ _The above is copied from the [current list of versions](https://www.postgresql. ## Known Limitations -- Ignores `STATE` Singer messages. - Requires a [JSON Schema](https://json-schema.org/) for every stream. - Only string, string with date-time format, integer, number, boolean, object, and array types with or without null are supported. Arrays can diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index de3eedf0..9f644246 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -36,6 +36,7 @@ def stream_to_target(stream, target, config={}): :return: None """ + state_writer = sys.stdout streams = {} try: if not config.get('disable_collection', False): @@ -55,7 +56,8 @@ def stream_to_target(stream, target, config={}): invalid_records_threshold, max_batch_rows, max_batch_size, - line) + line, + state_writer) if line_count > 0 and line_count % batch_detection_threshold == 0: _flush_streams(streams, target) line_count += 1 @@ -98,7 +100,7 @@ def _report_invalid_records(streams): def _line_handler(streams, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, max_batch_size, - line): + line, state_writer): try: line_data = json.loads(line) except json.decoder.JSONDecodeError: @@ -163,7 +165,9 @@ def _line_handler(streams, target, invalid_records_detect, invalid_records_thres target.write_batch(stream_buffer) target.activate_version(stream_buffer, line_data['version']) elif line_data['type'] == 'STATE': - LOGGER.warning('`STATE` Singer message type not supported') + line = json.dumps(line_data['value']) + state_writer.write("{}\n".format(line)) + state_writer.flush() else: raise TargetError('Unknown message type {} in message {}'.format( line_data['type'], diff --git a/tests/test_target_tools.py b/tests/test_target_tools.py index a3ea61ea..3fdc4041 100644 --- a/tests/test_target_tools.py +++ b/tests/test_target_tools.py @@ -1,4 +1,5 @@ from copy import deepcopy +import json from unittest.mock import patch import pytest @@ -70,3 +71,19 @@ def test_loading__invalid__records__threshold(): target_tools.stream_to_target(InvalidCatStream(20), target, config=config) assert len(target.calls['write_batch']) == 0 + + +def test_state__capture(capsys): + stream = [ + json.dumps({'type': 'STATE', 'value': { 'test': 'state-1' }}), + json.dumps({'type': 'STATE', 'value': { 'test': 'state-2' }})] + + target_tools.stream_to_target(stream, Target()) + + out, _ = capsys.readouterr() + + filtered_output = list(filter(None, out.split('\n'))) + + assert len(filtered_output) == 2 + assert json.loads(filtered_output[0])['test'] == 'state-1' + assert json.loads(filtered_output[1])['test'] == 'state-2'