Skip to content
This repository was archived by the owner on Sep 15, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 80 additions & 46 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const WebSocketServer = require('ws').Server;
const maxmind = require('maxmind');
const cityLookup = maxmind.open('./GeoLite2-City.mmdb');

const {tempStreamPath, tempPath} = require('./utils');
const obfuscate = require('./obfuscator');
const Database = require('./database')({
firehose: config.get('firehose'),
Expand All @@ -21,7 +22,6 @@ const Store = require('./store')({
});

let server;
const tempPath = 'temp';

const prom = require('prom-client');
const connected = new prom.Gauge({
Expand All @@ -43,18 +43,19 @@ class ProcessQueue {
this.q = [];
this.numProc = 0;
}
enqueue(clientid) {
this.q.push(clientid);
enqueue(clientid, peerConnectionId) {
this.q.push({clientid, peerConnectionId});
if (this.numProc < this.maxProc) {
process.nextTick(this.process.bind(this));
} else {
console.log('process Q too long:', this.numProc);
}
}
process() {
const clientid = this.q.shift();
if (!clientid) return;
const p = child_process.fork('extract.js', [clientid]);
const next = this.q.shift();
if (!next) return;
const {clientid, peerConnectionId} = next;
const p = child_process.fork('extract.js', [clientid, peerConnectionId]);
p.on('exit', (code) => {
this.numProc--;
console.log('done', clientid, this.numProc, 'code=' + code);
Expand All @@ -65,16 +66,16 @@ class ProcessQueue {
}
if (this.numProc < 0) this.numProc = 0;
if (this.numProc < this.maxProc) process.nextTick(this.process.bind(this));
fs.readFile(tempPath + '/' + clientid, {encoding: 'utf-8'}, (err, data) => {
fs.readFile(tempStreamPath(clientid, peerConnectionId), {encoding: 'utf-8'}, (err, data) => {
if (err) {
console.error('Could not open file for store upload', err);
return;
}
// remove the file
fs.unlink(tempPath + '/' + clientid, () => {
fs.unlink(tempStreamPath(clientid, peerConnectionId), () => {
// we're good...
});
Store.put(clientid, data);
Store.put(`${clientid}-${peerConnectionId}`, data);
});
});
p.on('message', (msg) => {
Expand All @@ -84,7 +85,7 @@ class ProcessQueue {
p.on('error', () => {
this.numProc--;
console.log('failed to spawn, rescheduling', clientid, this.numProc);
this.q.push(clientid); // do not immediately retry
this.q.push({clientid, peerConnectionId}); // do not immediately retry
});
this.numProc++;
if (this.numProc > 10) {
Expand Down Expand Up @@ -152,66 +153,100 @@ function run(keys) {
const wss = new WebSocketServer({ server: server });
wss.on('connection', (client, upgradeReq) => {
connected.inc();
let numberOfEvents = 0;
// the url the client is coming from
const referer = upgradeReq.headers['origin'] + upgradeReq.url;
// TODO: check against known/valid urls

const ua = upgradeReq.headers['user-agent'];
const clientid = uuid.v4();
let tempStream = fs.createWriteStream(tempPath + '/' + clientid);
tempStream.on('finish', () => {
if (numberOfEvents > 0) {
q.enqueue(clientid);
} else {
fs.unlink(tempPath + '/' + clientid, () => {
// we're good...

const meta = () => {
return {
path: upgradeReq.url,
origin: upgradeReq.headers['origin'],
url: referer,
userAgent: ua,
time: Date.now(),
fileFormat: 2
}
}

const tempStreams = {}
const write = (data, peerConnectionId) => {
if (!peerConnectionId) {
return;
}
let tempStream = tempStreams[peerConnectionId];
if (!tempStream) {
//Create new temp file
const streamPath = tempStreamPath(clientid, peerConnectionId);
tempStream = fs.createWriteStream(streamPath);
tempStream.on('finish', () => {
q.enqueue(clientid, peerConnectionId);
});
tempStream.write(JSON.stringify(meta()) + '\n');
const forwardedFor = upgradeReq.headers['x-forwarded-for'];
const {remoteAddress} = upgradeReq.connection;
const address = forwardedFor || remoteAddress;
if (address) {
process.nextTick(() => {
const city = cityLookup.get(address);
if (tempStream) {
write(['location', null, city, Date.now()], peerConnectionId);
}
});
}
tempStreams[peerConnectionId] = tempStream;
}
});
if(tempStream.writable) {
tempStream.write(JSON.stringify(data) + '\n');
} else {
console.error("Unable to write to stream: ", data, clientid, peerConnectionId);
}
}

const meta = {
path: upgradeReq.url,
origin: upgradeReq.headers['origin'],
url: referer,
userAgent: ua,
time: Date.now(),
fileFormat: 2,
};
tempStream.write(JSON.stringify(meta) + '\n');
const closeStream = (peerConnectionId) => {
if (!peerConnectionId) {
return;
}
let tempStream = tempStreams[peerConnectionId];
if (tempStream) {
write(['close', peerConnectionId, null, Date.now()], peerConnectionId);
tempStream.end();
}
}

const forwardedFor = upgradeReq.headers['x-forwarded-for'];
const {remoteAddress} = upgradeReq.connection;
const address = forwardedFor || remoteAddress;
if (address) {
process.nextTick(() => {
const city = cityLookup.get(address);
if (tempStream) {
tempStream.write(JSON.stringify(['location', null, city, Date.now()]) + '\n');
}
});
const timeouts = {};
const handlePeerConnectionEnd = (peerConnectionId) => {
if (!peerConnectionId) return;
clearTimeout(timeouts[peerConnectionId]);
// Allow time for remaining events to come in
timeouts[peerConnectionId] = setTimeout(() => {
closeStream(peerConnectionId);
}, 5000);
}

console.log('connected', ua, referer, clientid);

client.on('message', msg => {
try {
const data = JSON.parse(msg);

numberOfEvents++;
const peerConnectionId = data[1];

if (data[0].endsWith('OnError')) {
// monkey-patch java/swift sdk bugs.
data[0] = data[0].replace('OnError', 'OnFailure');
}
switch(data[0]) {
case 'close':
handlePeerConnectionEnd(peerConnectionId);
case 'getUserMedia':
case 'getUserMediaOnSuccess':
case 'getUserMediaOnFailure':
case 'navigator.mediaDevices.getUserMedia':
case 'navigator.mediaDevices.getUserMediaOnSuccess':
case 'navigator.mediaDevices.getUserMediaOnFailure':
tempStream.write(JSON.stringify(data) + '\n');
write(data, peerConnectionId);
break;
case 'constraints':
if (data[2].constraintsOptional) { // workaround for RtcStats.java bug.
Expand All @@ -222,7 +257,7 @@ function run(keys) {
});
delete data[2].constraintsOptional;
}
tempStream.write(JSON.stringify(data) + '\n');
write(data, peerConnectionId);
break;
default:
if (data[0] === 'getstats' && data[2].values) { // workaround for RtcStats.java bug.
Expand All @@ -231,7 +266,7 @@ function run(keys) {
data[2].timestamp = timestamp;
}
obfuscate(data);
tempStream.write(JSON.stringify(data) + '\n');
write(data, peerConnectionId);
break;
}
} catch(e) {
Expand All @@ -241,9 +276,8 @@ function run(keys) {

client.on('close', () => {
connected.dec();
tempStream.write(JSON.stringify(['close', null, null, Date.now()]));
tempStream.end();
tempStream = null;
const remainingStreams = Object.keys(tempStreams);
remainingStreams.forEach(closeStream);
});
});
}
Expand Down
5 changes: 3 additions & 2 deletions extract.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const clientfeatures = require('./features-client');
const streamfeatures = require('./features-stream');
const statsDecompressor = require('./getstats-deltacompression').decompress;
const statsMangler = require('./getstats-mangle');
const {extractTracks, extractStreams} = require('./utils');
const {extractTracks, extractStreams, tempStreamPath} = require('./utils');

// dumps all peerconnections.
function dump(url, client, clientid, data) {
Expand Down Expand Up @@ -134,7 +134,8 @@ function generateFeatures(url, client, clientid) {
}

const clientid = process.argv[2];
const path = 'temp/' + clientid;
const peerConnectionId = process.argv[3];
const path = tempStreamPath(clientid, peerConnectionId);
fs.readFile(path, {encoding: 'utf-8'}, (err, data) => {
if (err) {
console.error(err, path);
Expand Down
Loading