diff --git a/config/default.yaml b/config/default.yaml index 16cccc19..ea761350 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -23,6 +23,7 @@ gcp: table: maxFlushTime: bufferSize: + streaming: true github: client_id: GITHUB_CLIENT_ID diff --git a/config/production.yaml b/config/production.yaml index 16cccc19..ea761350 100644 --- a/config/production.yaml +++ b/config/production.yaml @@ -23,6 +23,7 @@ gcp: table: maxFlushTime: bufferSize: + streaming: true github: client_id: GITHUB_CLIENT_ID diff --git a/database/bigquery.js b/database/bigquery.js index 167a8671..791f910a 100644 --- a/database/bigquery.js +++ b/database/bigquery.js @@ -53,22 +53,28 @@ module.exports = function (config) { } const bigquery = new BigQuery(); - const recordBuffer = new RecordBuffer({ - maxFlushTime: config.maxFlushTime || 5 * 60 * 1000, - bufferSize: config.bufferSize || 100 - }); - recordBuffer.on('flush', (filename) => { - bigquery - .dataset(config.dataset) - .table(config.table) - .load(filename, { format: 'JSON', ignoreUnknownValues: true }) - .then(() => { - if (isProduction) { - fs.unlink(filename, () => { }); - } - }) - .catch(e => console.error('error loading into bigquery', e)); - }); + + let recordBuffer = null; + if (!config.streaming) + { + recordBuffer = new RecordBuffer({ + maxFlushTime: config.maxFlushTime || 5 * 60 * 1000, + bufferSize: config.bufferSize || 100 + }); + recordBuffer.on('flush', (filename) => { + bigquery + .dataset(config.dataset) + .table(config.table) + .load(filename, { format: 'JSON', ignoreUnknownValues: true }) + .then(() => { + if (isProduction) { + fs.unlink(filename, () => { }); + } + }) + .catch(e => console.error('error loading into bigquery', e)); + }); + } + return { put: function (pageUrl, clientId, connectionId, clientFeatures, connectionFeatures, streamFeatures) { const d = new Date().getTime(); @@ -81,7 +87,22 @@ module.exports = function (config) { }; Object.assign(item, clientFeatures, connectionFeatures, streamFeatures); - recordBuffer.put(item); + + if (config.streaming) + { + bigquery + .dataset(config.dataset) + .table(config.table) + .insert(item, { ignoreUnknownValues: true }) + .then((res) => { + logger.debug("Successful streaming insert into BigQuery", res); + }) + .catch(e => console.error('error insert into bigquery', e)); + } + else + { + recordBuffer.put(item); + } }, }; }