diff --git a/README.md b/README.md index bac0e37a..54b566a3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,83 @@ # SensorFlow -A pure Java library for the unified sensor-data streams management. +A framework for the unified sensor-data streams management. +The framework is composed of two blocks: +1. Real-life sensing +2. Data management + +**Issues** +1. Heterogeneity: of devices, protocols, cloud solutions and applications +2. Synchronization +3. Security/privacy + +**Key paradigm**: Modularity and common interfaces + +**Components**: +* BSN Manager (BSNM) +* Input Module +* Output Module +* Processing Module +* User Interface + +**Data types**: +* Signals: Series of (timestamp,value) pairs conveying the information from the sensors +* Metadata: Information about the composition of the BSN and characteristics of the collected signals +* Messages: Series of (timestamp, {event}) pairs that describe the status of the network with possible errors. Used by the modules to communicate their current status to the BSNM. + +## Components +### BSNM +Manages the BSN through the connected modules. +Main functions: +* Set the BSN and provide a time reference to the input plugins for the synchronization (together with latency estimation) +* Check the status of the BSN +* Start and stop the acquisition + +### Input Module +Manages the input protocols and exposes a common interface to the BSNM. +Two distinct functions: +* Interfaces with the device +* Manages the data streams + +### Output Module +Manages the output protocols and exposes a common interface to the BSNM according to the application +Two distinct functions: +* Interfaces with the destination +* Manages the security/privacy + +### Processing Module +Provides real-time capabilities +Two types: +* (quasi) Real-Time +* Post-acquisition + +### User Interface +Exploits the BSNM to allow the user to set the BSN + +## Development plan +### API::DJango Rest Server +* DJango admin [Done] +* Authenticate users via token [Done] +* Provide profile info [Done] +* Provide tokens to crossbar for auth +* Browse uploads +* User MRUs +* Browse experiments and sessions +* Browse plugins + +### Researcher::WebGUI +* Data management & download +* Session management: experiments and if they have real time feedback +* Plugin management +* Account management + +### Experimenter::Mobile +* Authenticates [Done] +* Uploads data (September) +* Receives feedback + +### Plugin::Remote Client +* Authenticates +* Receives data through WebSocket/MQTT + +### Database::MongoDB Server +* Design schema [Done] +* Define data ("Collection") diff --git a/build.gradle b/build.gradle index 5207b8a4..ed3569e5 100644 --- a/build.gradle +++ b/build.gradle @@ -1,10 +1,7 @@ apply plugin: 'java' -//apply plugin: 'maven' -// -//group = 'com.github.MPBA' -// -// -//dependencies { -// targetCompatibility = '1.7' -// sourceCompatibility = '1.7' -//} + +dependencies { + targetCompatibility = '1.7' + sourceCompatibility = '1.7' + testCompile 'junit:junit:4.12' +} \ No newline at end of file diff --git a/src/main/java/eu/fbk/mpba/sensorflow/Input.java b/src/main/java/eu/fbk/mpba/sensorflow/Input.java new file mode 100644 index 00000000..40eb5776 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/Input.java @@ -0,0 +1,243 @@ +package eu.fbk.mpba.sensorflow; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public abstract class Input implements InputGroup { + + // Static time - things + + private static long bootTime = System.currentTimeMillis() * 1_000_000L - System.nanoTime(); + private static TimeSource time = new TimeSource() { + + @Override + public long getMonoUTCNanos() { + return System.nanoTime() + bootTime; + } + + @Override + public long getMonoUTCNanos(long systemNanoTime) { + return systemNanoTime + bootTime; + } + }; + + private static AtomicLong sequential = new AtomicLong(1L); + + // Fields + + public final int intUid; + private final boolean reactive; + private volatile boolean listened = true; + private String name; + + private InputGroup parent; + private List header; + private Set outputs = new HashSet<>(); + private ReentrantReadWriteLock outputsAccess = new ReentrantReadWriteLock(false); + private Map dictionary = new HashMap<>(); + private ReentrantReadWriteLock dictionaryAccess = new ReentrantReadWriteLock(false); + private long holdTimestamp; + private double[] holdValue; + private long holdTimestampLog; + private String holdValueLog; + + // Constructors + + protected Input(Collection header) { + this(null, Input.class.getSimpleName(), header, false); + } + + protected Input(InputGroup parent, Collection header) { + this(parent, Input.class.getSimpleName(), header,false); + } + + protected Input(InputGroup parent, String name, Collection header) { + this(parent, name, header, false); + } + + protected Input(InputGroup parent, String name, Collection header, boolean reactive) { + this.reactive = reactive; + long longUid = sequential.getAndIncrement(); + this.intUid = (int) longUid / 2 * ((int) longUid % 2 * 2 - 1); + this.parent = parent; + this.name = name != null ? name : getClass().getSimpleName() + "-" + hashCode(); + this.header = new ArrayList<>(header); + } + + public void setName(String name) { + this.name = name; + } + + // Outputs access + + void addOutput(OutputManager output) { + outputsAccess.writeLock().lock(); + outputs.add(output); + outputsAccess.writeLock().unlock(); + pushDictionary(output); + if (listened && reactive) { + if (holdValue != null) // "&& reactive" is implicit + pushValueInner(holdTimestamp, holdValue); + if (holdValueLog != null) // "&& reactive" is implicit + pushLogInner(holdTimestampLog, holdValueLog); + } + } + + void removeOutput(OutputManager output) { + outputsAccess.writeLock().lock(); + outputs.remove(output); + outputsAccess.writeLock().unlock(); + } + + void pushDictionary(OutputManager output) { + dictionaryAccess.readLock().lock(); + for (Map.Entry entry : dictionary.entrySet()) { + output.pushLog(this, getTimeSource().getMonoUTCNanos(), formatKeyValue(entry.getKey(), entry.getValue())); + } + dictionaryAccess.readLock().unlock(); + } + + Collection getOutputs() { + outputsAccess.readLock().lock(); + ArrayList outputManagers = new ArrayList<>(outputs); + outputsAccess.readLock().unlock(); + return outputManagers; + } + + // Outputs access - Notify + + private String formatKeyValue(String key, String value) { + String separator = ","; + final String key2 = key.replace("\\", "\\\\").replace(separator, "\\s"); + final String value2 = value.replace("\\", "\\\\").replace(separator, "\\s"); + return key2 + separator + value2; + } + + public void putKeyValue(String key, String value) { + dictionaryAccess.writeLock().lock(); + String old = dictionary.put(key, value); + dictionaryAccess.writeLock().unlock(); + if (listened && !value.equals(old)) { + pushLog(getTimeSource().getMonoUTCNanos(), formatKeyValue(key, value)); + } + } + + public void pushValue(long time, double value) { + pushValue(time, new double[] { value }); + } + + public void pushValue(long time, double[] value) { + // Shouldn't be called before onCreateAndAdded + if (listened) { + pushValueInner(time, value); + } + if (reactive) { + holdTimestamp = time; + holdValue = value; + } + } + + public void pushLog(long time, String message) { + // Shouldn't be called before onCreateAndAdded + if (listened) { + pushLogInner(time, message); + } + if (reactive) { + holdTimestampLog = time; + holdValueLog = message; + } + } + + private void pushValueInner(long time, double[] value) { + outputsAccess.readLock().lock(); + for (OutputManager output : outputs) + if (output.isEnabled()) + output.pushValue(this, time, value); + outputsAccess.readLock().unlock(); + } + + private void pushLogInner(long time, String message) { + outputsAccess.readLock().lock(); + for (OutputManager output : outputs) + if (output.isEnabled()) + output.pushLog(this, time, message); + outputsAccess.readLock().unlock(); + } + + // Muting + + public boolean isMuted() { + return !listened; + } + + public void mute() { + this.listened = false; + } + + public void unmute() { + this.listened = true; + if (reactive) { + pushValueInner(holdTimestamp, holdValue); + } + } + + // Gets + + public boolean isReactive() { + return reactive; + } + + public static TimeSource getTimeSource() { + return time; + } + + public InputGroup getParent() { + return parent; + } + + public final List getHeader(){ + return header; + } + + // Gets - SFPlugin non-final Overrides + + @Override + public final String getName() { + InputGroup parent = getParent(); + return parent != null ? parent.getName() + "/" + getSimpleName() : getSimpleName(); + } + + // Gets - InputGroup final Overrides + + @Override + public final String getSimpleName() { + return name; + } + + @Override + public final Collection getChildren() { + return Collections.singletonList(this); + } + + // Finalization + + public void close() { + outputsAccess.writeLock().lock(); + outputs.clear(); + outputsAccess.writeLock().unlock(); + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/InputGroup.java b/src/main/java/eu/fbk/mpba/sensorflow/InputGroup.java new file mode 100644 index 00000000..ea2a709d --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/InputGroup.java @@ -0,0 +1,27 @@ +package eu.fbk.mpba.sensorflow; + +import java.util.Collection; + +public interface InputGroup extends SFPlugin { + + /** + * Called when the input plugin is added to SensorFlow but not wired yet. This is the place + * where to finalize the setup of the Streams. + */ + void onCreate(); + + /** + * Called after routing ("wiring") the plugin to all the outputs. + */ + void onAdded(); + + /** + * Called when the plugin is removed from SensorFlow + */ + void onRemoved(); + + Collection getChildren(); + + String getSimpleName(); + +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/InputManager.java b/src/main/java/eu/fbk/mpba/sensorflow/InputManager.java new file mode 100644 index 00000000..014891a0 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/InputManager.java @@ -0,0 +1,72 @@ +package eu.fbk.mpba.sensorflow; + +class InputManager { + private InputObserver manager = null; + private PluginStatus status = PluginStatus.INSTANTIATED; + private InputGroup inputGroup; + + InputManager(InputGroup inputGroup, InputObserver manager) { + this.inputGroup = inputGroup; + this.manager = manager; + changeStatus(PluginStatus.INSTANTIATED); + } + + private void changeStatus(PluginStatus s) { + if (manager != null) + manager.inputStatusChanged(this, status = s); + } + + // Events + + void onCreate() { + switch (status) { + case INSTANTIATED: + inputGroup.onCreate(); + changeStatus(PluginStatus.CREATED); + break; + default: + throw new UnsupportedOperationException("onCreate out of place, current status: " + status.toString()); + } + } + + void onAdded() { + switch (status) { + case CREATED: + inputGroup.onAdded(); + changeStatus(PluginStatus.ADDED); + break; + default: + throw new UnsupportedOperationException("onAdded out of place, current status: " + status.toString()); + } + } + + void onRemovedAndClose() { + switch (status) { + case ADDED: + inputGroup.onRemoved(); + changeStatus(PluginStatus.REMOVED); + case CREATED: + case REMOVED: + inputGroup.onClose(); + changeStatus(PluginStatus.CLOSED); + break; + default: + throw new UnsupportedOperationException("onRemovedAndClose out of place, current status: " + status.toString()); + } + } + + // Getters + + Iterable getInputs() { + return inputGroup.getChildren(); + } + +// PluginStatus getStatus() { +// return status; +// } + + InputGroup getInputGroup() { + return inputGroup; + } + +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/InputObserver.java b/src/main/java/eu/fbk/mpba/sensorflow/InputObserver.java new file mode 100644 index 00000000..4fba3c0b --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/InputObserver.java @@ -0,0 +1,5 @@ +package eu.fbk.mpba.sensorflow; + +interface InputObserver { + void inputStatusChanged(InputManager input, PluginStatus state); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/Log.java b/src/main/java/eu/fbk/mpba/sensorflow/Log.java new file mode 100644 index 00000000..b476d64d --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/Log.java @@ -0,0 +1,31 @@ +package eu.fbk.mpba.sensorflow; + +public class Log { + private static long ord = 1_000_000_000L; + public static boolean enabled = false; + public static void l(Object text) { + if (enabled) { + StackTraceElement x = Thread.currentThread().getStackTrace()[2]; + String b = + String.valueOf(ord++) + + " " + + Thread.currentThread().getId() + + " " + + x.getMethodName() + + " (" + + x.getFileName() + + ":" + + x.getLineNumber() + + "): " + + text; + System.out.println(b); + } + } + public static void l() { + l(""); + } + public static void s() { + System.out.println(Thread.currentThread().getId() + " " + Thread.currentThread().getName()); + new Throwable().printStackTrace(); + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/Output.java b/src/main/java/eu/fbk/mpba/sensorflow/Output.java new file mode 100644 index 00000000..4c1601a9 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/Output.java @@ -0,0 +1,28 @@ +package eu.fbk.mpba.sensorflow; + +public interface Output extends SFPlugin { + + void onCreate(String sessionId); + + void onInputAdded(Input input); + + void onInputRemoved(Input input); + + /** + * This method is called when a new value vector is available to be used, transmitted or + * persisted. + * @param input Input that generated the data. + * @param timestamp Generation timestamp of the data. + * @param value The data. + */ + void onValue(Input input, long timestamp, double[] value); + + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. This method can be overridden to skip the parsing of the log type and tag. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param text String encoding the type, the tag and the message. + */ + void onLog(Input input, long timestamp, String text); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/OutputBuffer.java b/src/main/java/eu/fbk/mpba/sensorflow/OutputBuffer.java new file mode 100644 index 00000000..81d085f8 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/OutputBuffer.java @@ -0,0 +1,290 @@ +package eu.fbk.mpba.sensorflow; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +class OutputBuffer implements Output { + + /* + nulls encoding + flows NotNull + null doubles AND null events --> schema-event + null doubles --> event + null events --> values + */ + + private final long INPUT_ADDED = 0L; + private final long INPUT_REMOVED = -1L; + + private final Input[] flows; + private final long[] longs; + private final double[][] doubles; + private final String[] strings; + + private int takeIndex; + private int putIndex; + private int count; + + private final Output output; + + private final ReentrantLock lock; + + private final Condition notEmpty; + + private final Condition notFull; + + // Internal helper methods + + private void enqueued() { + if (++putIndex == flows.length) + putIndex = 0; + count++; + notEmpty.signal(); + } + + private void enqueue(Input f, long time, double[] v) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + flows[putIndex] = f; + longs[putIndex] = time; + doubles[putIndex] = v; + enqueued(); + } + + private void enqueue(Input f, long time, String message) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + flows[putIndex] = f; + longs[putIndex] = time; + strings[putIndex] = message; + enqueued(); + } + + private void enqueue(Input f, long added) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + flows[putIndex] = f; + longs[putIndex] = added; + enqueued(); + } + + private void dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + // cache optimization: --count == 0 --> init ==> improved locality on low queue usage + count--; + if (count == 0 || ++takeIndex == flows.length) + takeIndex = 0; + if (count == 0) + putIndex = 0; + notFull.signal(); + } + + OutputBuffer(Output drain, int capacity, boolean fair) { + if (drain == null) + throw new NullPointerException(); + if (capacity <= 0) + throw new IllegalArgumentException(); + this.output = drain; + this.flows = new Input[capacity]; + this.longs = new long[capacity]; + this.doubles = new double[capacity][]; + this.strings = new String[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } + + // Output interface + + @Override + public void onValue(Input f, long t, double[] v) { + if (v == null) + throw new NullPointerException("No support for null data"); + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + try { + while (count == flows.length) + notFull.await(); + enqueue(f, t, v); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onLog(Input f, long t, String v) { + if (v == null) + throw new IllegalArgumentException("No support for null logs"); + + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + try { + while (count == flows.length) + notFull.await(); + enqueue(f, t, v); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onInputAdded(Input f) { + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + try { + while (count == flows.length) + notFull.await(); + enqueue(f, INPUT_ADDED); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onInputRemoved(Input f) { + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + try { + while (count == flows.length) + notFull.await(); + enqueue(f, INPUT_REMOVED); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getName() { + return output.getName(); + } + + @Override + public void onCreate(String sessionId) { + output.onCreate(sessionId); + } + + @Override + public void onClose() { + output.onClose(); + } + + // Flushing + + public void pollToHandler(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + Input f; + long t; + boolean isVal; + boolean isLog; + double[] d = null; + String s = null; +// pollw = -System.nanoTime(); + lock.lockInterruptibly(); +// pollw += System.nanoTime(); +// pollwAvg *= .9; +// pollwAvg += .1 * pollw; + try { + while (count == 0) { + if (nanos > 0) { + nanos = notEmpty.awaitNanos(nanos); + } else { + return; + } + } + f = flows[takeIndex]; + flows[takeIndex] = null; + t = longs[takeIndex]; + isVal = doubles[takeIndex] != null; + isLog = strings[takeIndex] != null; + if (isVal) { + // Value + d = doubles[takeIndex]; + doubles[takeIndex] = null; + } + if (isLog) { + // Log + s = strings[takeIndex]; + strings[takeIndex] = null; + } + dequeue(); + } finally { + lock.unlock(); + } + if (isVal) { + // Is Value + output.onValue(f, t, d); + } else if (isLog) { + // Is Log + output.onLog(f, t, s); + } else { + // Is Schema Event + if (t == INPUT_ADDED) + output.onInputAdded(f); + else + output.onInputRemoved(f); + } + } + + // Control + + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return count; + } finally { + lock.unlock(); + } + } + + public int remainingCapacity() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return flows.length - count; + } finally { + lock.unlock(); + } + } + + public void clear() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int k = count; + if (k > 0) { + takeIndex = putIndex; + count = 0; + for (; k > 0 && lock.hasWaiters(notFull); k--) + notFull.signal(); + } + } finally { + lock.unlock(); + } + } + + // Gets + + public Output getHandler() { + return output; + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/OutputManager.java b/src/main/java/eu/fbk/mpba/sensorflow/OutputManager.java new file mode 100644 index 00000000..eb2d2ef5 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/OutputManager.java @@ -0,0 +1,206 @@ +package eu.fbk.mpba.sensorflow; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +class OutputManager { + private OutputObserver manager = null; + + private boolean stopPending = false; + private PluginStatus status = PluginStatus.INSTANTIATED; + private String sessionTag; + private Output output; + private boolean threaded; + private Set linkedInputs = new HashSet<>(); + private Set linkedInputsSnapshot = new HashSet<>(); + + private Output queue; + private volatile boolean enabled = true; + + OutputManager(Output output, OutputObserver manager, boolean buffered) { + this.output = output; + this.manager = manager; + this.threaded = buffered; + if (buffered || output.getClass().isAnnotationPresent(SingleThreadRequired.class)) + queue = new OutputBuffer(this.output, 800, false); + else + queue = this.output; + setEnabled(false); + changeStatus(PluginStatus.INSTANTIATED); + } + + private Thread sbufferingThread = new Thread(new Runnable() { + @Override + public void run() { + OutputBuffer queue = (OutputBuffer) OutputManager.this.queue; + while (!stopPending || queue.size() > 0) { + try { + queue.pollToHandler(200, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + } + } + } + }); + + private void beforeDispatch() { + // Create plugin + output.onCreate(sessionTag); + changeStatus(PluginStatus.CREATED); + // Here input add/removal differentially buffered + // Enable plugin + // Here every linked input will be added + // This call is sync: mutexes with addInput and removeInput + setEnabled(true); + // Here input add/removal in queue + } + + private void afterDispatch() { + // Here input add/removal differentially buffered, but no more useful + for (Input input : linkedInputsSnapshot) { + output.onInputRemoved(input); + } + output.onClose(); + changeStatus(PluginStatus.CLOSED); + } + + private void changeStatus(PluginStatus s) { + if (manager != null) + manager.outputStatusChanged(this, status = s); + } + + // Implemented + + void onCreateAndAdded(String sessionTag) { + if (status == PluginStatus.INSTANTIATED) { + this.sessionTag = sessionTag; + beforeDispatch(); + if (threaded) + sbufferingThread.start(); + for (Input i : linkedInputs) + i.pushDictionary(this); + } else + throw new UnsupportedOperationException("onCreateAndAdded out of place, current status: " + status.toString()); + } + + void onStopAndClose() { + if (status == PluginStatus.CREATED && !stopPending) { + setEnabled(false); + stopPending = true; + if (threaded) + try { + sbufferingThread.interrupt(); // stopPending == true + sbufferingThread.join(); // Max time specified in queue pollToHandler call + } catch (InterruptedException e) { + e.printStackTrace(); + } + afterDispatch(); + } else + throw new UnsupportedOperationException("onCreateAndAdded out of place, current status: " + status.toString() + + ", stopPending: " + stopPending); + } + + void pushLog(Input sensor, long time, String message) { + try { + queue.onLog(sensor, time, message); + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) + System.out.println("InterruptedException 9234rhyu1"); + else throw e; + } + } + + void pushValue(Input sensor, long time, double[] value) { + try { + queue.onValue(sensor, time, value); + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) + System.out.println("InterruptedException 9234rhyu2"); + else throw e; + } + } + + // Setters + + synchronized void addInput(Input s) { + if (linkedInputs.add(s) && enabled) + try { + queue.onInputAdded(s); + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) + System.out.println("InterruptedException 9234rhyu3"); + else throw e; + } + } + + synchronized void removeInput(Input s) { + if (linkedInputs.remove(s) && enabled) + try { + queue.onInputRemoved(s); + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) + System.out.println("InterruptedException 9234rhyu4"); + else throw e; + } + } + + synchronized void setEnabled(boolean enabled) { + this.enabled = enabled; + if (!enabled) { + linkedInputsSnapshot.clear(); + linkedInputsSnapshot.addAll(linkedInputs); + } else { + HashSet toRemove = new HashSet<>(linkedInputsSnapshot); + toRemove.removeAll(linkedInputs); + HashSet toAdd = new HashSet<>(linkedInputs); + toAdd.removeAll(linkedInputsSnapshot); + + linkedInputs.clear(); + linkedInputs.addAll(linkedInputsSnapshot); + + for (Input s : toRemove) + removeInput(s); + for (Input s : toAdd) + addInput(s); + } + } + + // Getters + + Collection getInputs() { + ArrayList copy = new ArrayList<>(linkedInputs); + return Collections.unmodifiableCollection(copy); + } + + boolean isEnabled() { + return enabled; + } + + PluginStatus getStatus() { + return status; + } + + Output getOutput() { + return output; + } + + // Finalization + + public synchronized void close() { + if (output != null) { + output.onClose(); + output = null; + } + linkedInputs.clear(); + linkedInputs = null; + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } +} \ No newline at end of file diff --git a/src/main/java/eu/fbk/mpba/sensorflow/OutputObserver.java b/src/main/java/eu/fbk/mpba/sensorflow/OutputObserver.java new file mode 100644 index 00000000..fea10eb0 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/OutputObserver.java @@ -0,0 +1,5 @@ +package eu.fbk.mpba.sensorflow; + +interface OutputObserver { + void outputStatusChanged(OutputManager sender, PluginStatus status); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/PluginStatus.java b/src/main/java/eu/fbk/mpba/sensorflow/PluginStatus.java new file mode 100644 index 00000000..086406d2 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/PluginStatus.java @@ -0,0 +1,5 @@ +package eu.fbk.mpba.sensorflow; + +public enum PluginStatus { + INSTANTIATED, CREATED, ADDED, REMOVED, CLOSED +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/SFPlugin.java b/src/main/java/eu/fbk/mpba/sensorflow/SFPlugin.java new file mode 100644 index 00000000..4e402195 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/SFPlugin.java @@ -0,0 +1,6 @@ +package eu.fbk.mpba.sensorflow; + +public interface SFPlugin { + String getName(); + void onClose(); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/SensorFlow.java b/src/main/java/eu/fbk/mpba/sensorflow/SensorFlow.java new file mode 100644 index 00000000..c3ce51bf --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/SensorFlow.java @@ -0,0 +1,361 @@ +package eu.fbk.mpba.sensorflow; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; + +public class SensorFlow { + + // Fields + + private final String sessionTag; + private volatile boolean closed = false; + + private final Map userInputs = new TreeMap<>(); + private final Map userOutputs = new HashMap<>(); + + // Status Interfaces + + private final InputObserver input = new InputObserver() { + @Override + public void inputStatusChanged(InputManager sender, PluginStatus state) { + // onStatusChanged + } + }; + + private final OutputObserver output = new OutputObserver() { + @Override + public void outputStatusChanged(OutputManager sender, PluginStatus state) { + // onStatusChanged + } + }; + + // Data and Events Interface, will be re-added for profiling + + // Engine implementation + + public SensorFlow() { + this(new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date())); + } + + public SensorFlow(String sessionTag) { + this.sessionTag = sessionTag; + } + + public String getSessionTag() { + return sessionTag; + } + + // Plugins + + private SensorFlow add(InputGroup p, boolean routedEverywhere) { + InputManager added = null; + // Check if only the name is already contained + synchronized (userInputs) { + if (!userInputs.containsKey(p.getName())) { + added = new InputManager(p, this.input); + userInputs.put(p.getName(), added); + } + } + if (added != null) { + // InputGroups are not recursive, just one level +// p.setManager(this.flow); + added.onCreate(); + if (routedEverywhere) + routeAll(added); + added.onAdded(); + } + return this; + } + + public SensorFlow add(InputGroup p) { + return add(p, true); + } + + public SensorFlow add(Collection p) { + for (InputGroup inputGroup : p) + add(inputGroup); + return this; + } + + public SensorFlow addNotRouted(InputGroup p) { + return add(p, false); + } + + public SensorFlow addNotRouted(Collection p) { + for (InputGroup p1 : p) + addNotRouted(p1); + return this; + } + + private void add(Output p, boolean threaded, boolean routeEverywhere) { + OutputManager added = null; + // Check if only the name is already contained + synchronized (userOutputs) { + if (!userOutputs.containsKey(p)) { + userOutputs.put(p, added = new OutputManager(p, this.output, threaded)); + } + } + if (added != null) { + // Reversed the following two statements: + // Before: onCreate call precedes all data + // Now: some data after onCreate call may be lost. + // It is ok as the important rule is that onAdded call precedes all data. + added.onCreateAndAdded(sessionTag); + if (routeEverywhere) + routeAll(added); + } + } + + public SensorFlow add(Output p) { + add(p, true, true); + return this; + } + + public SensorFlow addNotRouted(Output p) { + add(p, true, false); + return this; + } + + public SensorFlow addInThread(Output p) { + add(p, false, true); + return this; + } + + public SensorFlow addInThreadNotRouted(Output p) { + add(p, false, false); + return this; + } + + public SensorFlow remove(InputGroup p) { + InputManager removed = null; + // Check if only the name is already contained + synchronized (userInputs) { + if (userInputs.containsKey(p.getName())) { + removed = userInputs.remove(p.getName()); + } + } + if (removed != null) { + // InputGroups are not recursive, just one level + for (Input s : p.getChildren()) { + ArrayList outputs = new ArrayList<>(s.getOutputs()); + for (OutputManager o : outputs) + removeRoute(s, o); + } + removed.onRemovedAndClose(); +// p.setManager(null); + } + return this; + } + + public SensorFlow remove(Output p) { + OutputManager removed = null; + // Check if only the name is already contained + synchronized (userOutputs) { + if (userOutputs.containsKey(p)) + removed = userOutputs.remove(p); + } + if (removed != null) { + final OutputManager o = removed; + ArrayList inputs = new ArrayList<>(o.getInputs()); + for (Input i : inputs) + removeRoute(i, o); + o.onStopAndClose(); + } + return this; + } + + public SensorFlow addRoute(Input from, Output to) { + if (from != null && to != null) { + OutputManager outMan; + synchronized (userOutputs) { + outMan = userOutputs.get(to); + } + addRoute(from, outMan); + } + return this; + } + + public SensorFlow removeRoute(Input from, Output to) { + if (from != null && to != null) { + OutputManager outMan; + synchronized (userOutputs) { + outMan = userOutputs.get(to); + } + removeRoute(from, outMan); + } + return this; + } + + public boolean isRouted(Input from, Output to) { + if (from != null && to != null) { + OutputManager outMan; + synchronized (userOutputs) { + outMan = userOutputs.get(to); + } + return isRouted(from, outMan); + } + return false; + } + + private void addRoute(Input from, OutputManager outMan) { + outMan.addInput(from); + from.addOutput(outMan); + } + + private void removeRoute(Input fromSensor, OutputManager outMan) { + fromSensor.removeOutput(outMan); + outMan.removeInput(fromSensor); + } + + // Concurrently unsafe + private boolean isRouted(Input fromSensor, OutputManager outMan) { + return fromSensor.getOutputs().contains(outMan) && outMan.getInputs().contains(fromSensor); + } + + public SensorFlow enableOutput(Output o) { + return setOutputEnabled(true, o); + } + + public SensorFlow disableOutput(Output o) { + return setOutputEnabled(false, o); + } + + private SensorFlow setOutputEnabled(boolean enabled, Output o) { + synchronized (userOutputs) { + if (userOutputs.containsKey(o)) { + (userOutputs.get(o)).setEnabled(enabled); + } else + throw new IllegalArgumentException("Output not added."); + } + return this; + } + + // Gets + + public boolean isOutputEnabled(Output o) { + boolean b; + synchronized (userOutputs) { + b = userOutputs.containsKey(o) && (userOutputs.get(o)).isEnabled(); + } + return b; + } + +// public InputGroup getInput(String name) { +// InputManager r; +// synchronized (userInputs) { +// r = userInputs.get(name); +// } +// return r == null ? null : r.getInputGroup(); +// } +// +// public Output getOutput(String name) { +// Object r; +// synchronized (userInputs) { +// r = userOutputs.get(name); +// } +// return r == null ? null : ((OutputManager)r).getOutput(); +// } + + public Collection getInputs() { + ArrayList x; + synchronized (userInputs) { + x = new ArrayList<>(userInputs.size()); + for (InputManager o : userInputs.values()) + x.add(o.getInputGroup()); + } + return Collections.unmodifiableCollection(x); + } + + public Collection getOutputs() { + ArrayList x; + synchronized (userOutputs) { + x = new ArrayList<>(userOutputs.size()); + for (OutputManager o : userOutputs.values()) + x.add(o.getOutput()); + } + return Collections.unmodifiableCollection(x); + } + + // Engine operation + + // Does not create duplicates + public SensorFlow routeClear() { + // REMOVE ALL + for (InputManager d : userInputs.values()) + for (Input s : d.getInputs()) // FOREACH SENSOR + for (OutputManager o : userOutputs.values()) // Remove LINK TO EACH OUTPUT + removeRoute(s, o); + return this; + } + + // Does not create duplicates + public SensorFlow routeAll() { + // SENSORS x OUTPUTS + for (InputManager d : userInputs.values()) + routeAll(d); + return this; + } + + private SensorFlow routeAll(InputManager d) { + for (Input s : d.getInputs()) // FOREACH SENSOR + for (OutputManager o : userOutputs.values()) // LINK TO EACH OUTPUT + addRoute(s, o); + return this; + } + + private SensorFlow routeAll(OutputManager o) { + // SENSORS x OUTPUTS + for (InputManager d : userInputs.values()) + for (Input s : d.getInputs()) // FOREACH SENSOR + addRoute(s, o); + return this; + } + +// // Does not create duplicates +// public SensorFlow routeNthToNth() { +// // max SENSORS, OUTPUTS +// int maxi = Math.max(userInputs.size(), userOutputs.size()); +// for (int i = 0; i < maxi; i++) // FOREACH OF THE LONGEST +// for (Input s : new ArrayList<>(userInputs.values()).get(i % userInputs.size()).getInputs()) // LINK MODULE LOOPING ON THE SHORTEST +// addRoute(s, new ArrayList<>(userOutputs.values()).get(i % userOutputs.size())); +// return this; +// } + +// private void changeStatus(Status status) { +// this.closed = status; +// } + +// public Status getStatus() { +// return closed; +// } + + public synchronized void close() { + if (!closed) { + for (InputManager d : userInputs.values()) { +// for (Input s : d.getInputGroup().getChildren()) { +// s.onRemoved(); +// s.onClose(); +// } + d.getInputGroup().onRemoved(); + d.getInputGroup().onClose(); + } + routeClear(); + for (OutputManager o : userOutputs.values()) + o.onStopAndClose(); + closed = true; + } + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/SingleThreadRequired.java b/src/main/java/eu/fbk/mpba/sensorflow/SingleThreadRequired.java new file mode 100644 index 00000000..80f17c98 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/SingleThreadRequired.java @@ -0,0 +1,8 @@ +package eu.fbk.mpba.sensorflow; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +public @interface SingleThreadRequired { +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/TimeSource.java b/src/main/java/eu/fbk/mpba/sensorflow/TimeSource.java new file mode 100644 index 00000000..e9f763ec --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/TimeSource.java @@ -0,0 +1,6 @@ +package eu.fbk.mpba.sensorflow; + +public interface TimeSource { + long getMonoUTCNanos(); + long getMonoUTCNanos(long realTimeNanos); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/DeviceDetector.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/DeviceDetector.java new file mode 100644 index 00000000..fb5e9f6d --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/DeviceDetector.java @@ -0,0 +1,153 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; + +/** + * This interface define methods to recognize bluetooth device properties and type without + * connecting to it. + * + * Each WirelessDevice should have a DeviceDetector that says if a given device is of that + * WirelessDevice type. + * + * To return a Result, use the methods {isNot(), canBe(), shouldBe(), is()}. + */ + +public abstract class DeviceDetector { + private final Class type; + + public DeviceDetector(Class type) { + this.type = type; + } + + /** + * As some manufacturers do not provide clear ways to recognize a device based on its properties + * (without connecting to it) this enumeration lists three classes of confidence over the + * results returned. + * + * See constants documentation for further information. + */ + public enum Confidence { + /** + * The highest level of confidence. This level should be such that at most one DeviceDetector + * recognizes a remote device as of its type. + */ + IS(4), + /** + * A middle level where the device in analysis has a good chance to be of the reported type. + * Higher levels o confidence have priority. + */ + SHOULD_BE(2), + /** + * Last level of confidence. This should be used where the device has no way of being recognised + * other than Bluetooth version. + */ + CAN_BE(1), + /** + * This constant indicates that surely the device is not of the current type. + */ + IS_NOT(0); + + int p; + Confidence(int p) { + this.p = p; + } + + /** + * Compares this confidence with another. + * @param o Another confidence. + * @return Returns true if this > o + */ + public boolean greaterThan(Confidence o) { + return this.p > o.p; + } + } + + /** + * Given a list of WirelessDevice classes implementing the static method "getDeviceDetector", + * finds the last SHOULD_BE or CAN_BE or the first IS + * @param filter Collection of Class types referring to the Wireless Devices to scan. + * @param i The object to classify. + * @return Returns the chosen DeviceDetector.Result. + */ + public static DeviceDetector.Result find(Collection> filter, Object i) { + DeviceDetector.Result found = null; + for (Class f : filter) { + DeviceDetector invoke; + try { + invoke = (DeviceDetector) f.getDeclaredMethod("getDeviceDetector").invoke(null); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + e.printStackTrace(); + continue; + } + DeviceDetector.Result evaluate = invoke.evaluate(i); + if (found == null || evaluate.getConfidence().greaterThan(found.getConfidence())) + found = evaluate; + if (found.getConfidence() == DeviceDetector.Confidence.IS) + break; + } + return found; + } + + public Result isNot() { + return new Result(Confidence.IS_NOT, "", type); + } + + public Result canBe(String name) { + return new Result(Confidence.CAN_BE, name, type); + } + + public Result shouldBe(String name) { + return new Result(Confidence.SHOULD_BE, name, type); + } + + public Result is(String name) { + return new Result(Confidence.IS,name, type); + } + + /** + * This class contains the result of the evaluation ({@see evaluate()}). + */ + public static class Result { + private final Confidence c; + private final String identifier; + private Class type; + + private Result(Confidence c, String identifier, Class type) { + this.c = c; + this.identifier = identifier; + this.type = type; + this.type = type; + } + + /** + * Evaluation confidence. + * @return Returns Confidence.IS_NOT if the device is not of the current type, or one of + * Confidence.{{IS, SHOULD_BE, CAN_BE}} + */ + public Confidence getConfidence() { + return c; + } + + /** + * Identifier of the device to show to the user if the device is of the reported type. + */ + public String getDeviceIdentifier() { + return identifier; + } + + /** + * Type of the device detected. + */ + public Class getType() { + return type; + } + } + + /** + * This method evaluates if the deviceObject is of the current type and with which confidence. + * @param deviceObject An object representing the device. E.g. a BluetoothDevice + * @return The result of the evaluation. + */ + public abstract Result evaluate(Object deviceObject); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/IOutputModule.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/IOutputModule.java new file mode 100644 index 00000000..7d1ddf5b --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/IOutputModule.java @@ -0,0 +1,18 @@ +package eu.fbk.mpba.sensorflow.sense; + +import eu.fbk.mpba.sensorflow.Input; + +interface IOutputModule { + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. + * If the log is formatted, it will be split in code, tag and message, otherwise all the raw + * text will be put in message and type set to -1. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param type Type of the message. + * @param tag Tag associated with the message. + * @param message the message. + */ + void onLog(Input input, long timestamp, int type, String tag, String message); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/InputModule.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/InputModule.java new file mode 100644 index 00000000..c67db3de --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/InputModule.java @@ -0,0 +1,60 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; +import eu.fbk.mpba.sensorflow.SFPlugin; + +/** + * Base class for an InputModule + */ +public abstract class InputModule extends Module { + private final ArrayList children = new ArrayList<>(); + private boolean added = false; + + /** + * Constructor of abstract class + */ + public InputModule() { } + + /** + * Adds a Stream to the InputModule. The Input must have an unique name within the device inputs. + * An Input can be added to the WirelessDevice scheme in any moment. + * Special Inputs may be already present. + * + * @param input The flow to add to the InputModule. + */ + protected void addStream(Stream input) { + if (!added) { + children.add(input); + } else { + throw new UnsupportedOperationException("Online streams changes not supported. Add streams before onAdded or after onRemoved."); + } + } + + protected Collection getStreams() { + return children; + } + + @Override + public final Collection getChildren() { + ArrayList sfPlugins = new ArrayList<>(super.getChildren()); + sfPlugins.addAll(children); + return Collections.unmodifiableCollection(sfPlugins); + } + + public abstract void onCreate(); + + public void onAdded() { + added = true; + } + + public void onRemoved() { + added = false; + } + + public abstract void onClose(); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/LogInput.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/LogInput.java new file mode 100644 index 00000000..902bff41 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/LogInput.java @@ -0,0 +1,44 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.Collections; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; + +public class LogInput extends Input { + + /** + * Log that can be child of both Inputs and Outputs + * @param parent The parent, preferably of this Input. + * @param name The name, preferably of the parent of this Input. + */ + LogInput(InputGroup parent, String name) { + super(parent, name, Collections.emptyList()); + } + + public void pushLog(int type, String tag, String message) { + super.pushLog(getTimeSource().getMonoUTCNanos(), + LogMessage.format(type, tag, message) + ); + } + + @Override + public void onCreate() { + + } + + @Override + public void onAdded() { + + } + + @Override + public void onRemoved() { + + } + + @Override + public void onClose() { + + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/LogMessage.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/LogMessage.java new file mode 100644 index 00000000..18881af3 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/LogMessage.java @@ -0,0 +1,50 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.Locale; + +class LogMessage { + private String text; + private int type; + private String tag; + + public LogMessage(String text) { + this.text = text; + } + + public String getText() { + return text; + } + + public int getType() { + return type; + } + + public String getTag() { + return tag; + } + + public LogMessage invoke() { + // URL escape just the '\t' char + String[] tokens = text.split("\t"); + type = -1; + tag = ""; + if (tokens.length > 1) { + type = Integer.decode(tokens[0]); + tag = tokens[1]; + if (tokens.length == 3) + text = tokens[2]; + } + + return this; + } + + static String format(int type, String tag, String message) { + // URL escape just the '\t' char + return String.format(Locale.ENGLISH, + "%d\t%s\t%s", + type, + tag.replace("\\", "\\\\").replace("\t", "\\t"), + message + ); + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/Module.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/Module.java new file mode 100644 index 00000000..b43bffb2 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/Module.java @@ -0,0 +1,131 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.ArrayList; +import java.util.Collection; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; +import eu.fbk.mpba.sensorflow.SFPlugin; + +/** + * Base class for every sense plugin. It implements a common interface + */ +public abstract class Module implements SFPlugin, InputGroup { + private final LogInput moduleLog; + private final StatusInput moduleStatus; + private final ArrayList sfChildren = new ArrayList<>(4); + private String simpleName; + private String configuration; + + /** + * Constructor of abstract class + */ + Module() { + simpleName = getClass().getSimpleName(); + moduleLog = new LogInput(this, getName()); + moduleStatus = new StatusInput(this, getName()); + addSFChild(moduleLog); + addSFChild(moduleStatus); + } + + void addSFChild(Input child) { + sfChildren.add(child); + } + + /** + * Returns the path of the module considering InputGroups. + */ + @Override + public final String getName() { + return getSimpleName(); + } + + /** + * Returns the name of the module as an acquisition-unique identification. It should be standard + * and well-known as it identifies the module. + */ + public final String getSimpleName() { + return simpleName; + } + + public final void setName(String simpleName) { + this.simpleName = simpleName == null ? getClass().getSimpleName() : simpleName; + moduleLog.setName(getName()); + } + + public final void setConfiguration(String configuration) { + this.configuration = configuration; + } + + /** + * Returns the configuration string set by the constructor. + * @return String, as-is. + */ + public String getConfiguration() { + return configuration; + } + + /** + * Custom logs from the device software/hardware. + * @param type Identification code of the log type, 0 for metadata (LOG_*) + * @param tag Tag for the log, can be seen as a sub-type or can be ignored. + * @param message String containing the log message + */ + protected void pushLog(int type, String tag, String message) { + moduleLog.pushLog(type, tag, message); + } + + public enum Status { + OFF, ERROR, WAIT, OK + } + + /** + * Updates the status of the Module. Produces also a log with code LOG_STATUS_UPATE and the + * status as tag, the message is empty. + * @param status New status to propagate. + */ + protected void pushStatus(Status status) { + pushLog(LOG_STATUS_UPDATE, status.name(), ""); + moduleStatus.pushStatus(status); + } + + public static final int LOG_METADATA = 201; + public static final int LOG_UI = 202; + public static final int LOG_FAILURE = 106; + public static final int LOG_E = 105; + public static final int LOG_W = 104; + public static final int LOG_D = 103; + public static final int LOG_I = 102; + public static final int LOG_V = 101; + public static final int LOG_STATUS_UPDATE = 64; + + protected void putKeyValue(String key, String value) { + moduleLog.putKeyValue(key, value); + } + + @Override + protected void finalize() throws Throwable { + onClose(); + super.finalize(); + } + + public static final String KEY_DEV_ID = "DEV_ID"; + public static final String KEY_DEV_INFO = "DEV_INFO"; + public static final String KEY_HW_VER = "DEV_HW"; + public static final String KEY_SW_VER = "DEV_SW"; + + + @Override + public void onCreate() { } + + @Override + public void onAdded() { } + + @Override + public void onRemoved() { } + + @Override + public Collection getChildren() { + return sfChildren; + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/ModuleFactory.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/ModuleFactory.java new file mode 100644 index 00000000..425cae1e --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/ModuleFactory.java @@ -0,0 +1,29 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +public class ModuleFactory { + public static T newInstance(Class klass, RequirementsProvider provider) { + Constructor[] constructors = klass.getConstructors(); + if (constructors.length > 0) { + Class[] parameterTypes = constructors[0].getParameterTypes(); + Object[] o = new Object[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + o[i] = provider.get(parameterTypes[i]); + } + try { + return (T) constructors[0].newInstance(o); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + return null; + } + } + else + return null; + } + + public interface RequirementsProvider { + T get(Class requirement); + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/OutputModule.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/OutputModule.java new file mode 100644 index 00000000..84a0fc7b --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/OutputModule.java @@ -0,0 +1,95 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.Comparator; +import java.util.TreeSet; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; +import eu.fbk.mpba.sensorflow.Output; + +public abstract class OutputModule extends Module implements Output, IOutputModule { + + /** + * Constructor of abstract class + * + * @param name Name of the Module. + * @param settings Configuration string (e.g. json) to be passed to the Module. + */ + public OutputModule(String name, String settings) { + setName(name); + setConfiguration(settings); + } + + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. This method can be overridden to skip the parsing of the log type and tag. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param text String encoding the type, the tag and the message. + */ + @Override + public void onLog(Input input, long timestamp, String text) { + LogMessage l = new LogMessage(text).invoke(); + onLog(input, timestamp, l.getType(), l.getTag(), l.getText()); + } + + /** + * This method is called when a new value vector is available to be used, transmitted or + * persisted. + * @param input Input that generated the data. + * @param timestamp Generation timestamp of the data. + * @param value The data. + */ + @Override + public abstract void onValue(Input input, long timestamp, double[] value); + + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param type Type of the message. + * @param tag Tag associated with the message. + * @param message the message. + */ + @Override + public abstract void onLog(Input input, long timestamp, int type, String tag, String message); + + @Override + public abstract void onCreate(String sessionId); + + private TreeSet distinct = new TreeSet<>(new Comparator() { + @Override + public int compare(InputGroup o1, InputGroup o2) { + return o1.hashCode() - o2.hashCode(); + } + }); + + @Override + public void onInputAdded(Input input) { + InputGroup parent = input.getParent(); + if (parent != null + && parent instanceof InputModule + && distinct.add(parent)) { + onInputParentAdded(parent); + } + } + + @Override + public void onInputRemoved(Input input) { + InputGroup parent = input.getParent(); + if (parent != null + && parent instanceof InputModule + && distinct.remove(parent) + && !distinct.contains(parent)) { + onInputParentRemoved(parent); + } + } + + public void onInputParentAdded(InputGroup inputParent) { } + + public void onInputParentRemoved(InputGroup inputParent) { } + + @Override + public abstract void onClose(); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/ProcessingModule.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/ProcessingModule.java new file mode 100644 index 00000000..f530c96d --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/ProcessingModule.java @@ -0,0 +1,80 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.concurrent.atomic.AtomicBoolean; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.Output; + +public abstract class ProcessingModule extends InputModule implements Output, IOutputModule { + + private final AtomicBoolean created = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public final void onCreate(String sessionId) { + onCreate(); + } + + @Override + public final void onCreate() { + if (!created.getAndSet(true)) + onProcessingCreate(); + } + + @Override + public final void onClose() { + if (!closed.getAndSet(true)) + onProcessingClose(); + } + + /** + * This method is called when a new value vector is available to be used, transmitted or + * persisted. + * @param input Input that generated the data. + * @param timestamp Generation timestamp of the data. + * @param value The data. + */ + @Override + public abstract void onValue(Input input, long timestamp, double[] value); + + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. This method can be overridden to skip the parsing of the log type and tag. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param text String encoding the type, the tag and the message. + */ + @Override + public void onLog(Input input, long timestamp, String text) { + LogMessage l = new LogMessage(text).invoke(); + onLog(input, timestamp, l.getType(), l.getTag(), l.getText()); + } + + /** + * This method is called when a new log message is available to be used, transmitted or + * persisted. + * @param input Input that generated the log. + * @param timestamp Timestamp of the generation of the message + * @param type Type of the message. + * @param tag Tag associated with the message. + * @param message the message. + */ + @Override + public abstract void onLog(Input input, long timestamp, int type, String tag, String message); + + public abstract void onProcessingCreate(); + + @Override + public abstract void onAdded(); + + @Override + public abstract void onRemoved(); + + @Override + public abstract void onInputAdded(Input input); + + @Override + public abstract void onInputRemoved(Input input); + + public abstract void onProcessingClose(); +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/StatusInput.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/StatusInput.java new file mode 100644 index 00000000..6d1db88d --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/StatusInput.java @@ -0,0 +1,42 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.Collections; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; + +public class StatusInput extends Input { + + /** + * Log that can be child of both Inputs and Outputs + * @param parent The parent, preferably of this Input. + * @param name The name, preferably of the parent of this Input. + */ + StatusInput(InputGroup parent, String name) { + super(parent, name, Collections.emptyList(), true); + } + + public void pushStatus(Module.Status status) { + super.pushLog(getTimeSource().getMonoUTCNanos(), status.name()); + } + + @Override + public void onCreate() { + + } + + @Override + public void onAdded() { + + } + + @Override + public void onRemoved() { + + } + + @Override + public void onClose() { + + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/Stream.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/Stream.java new file mode 100644 index 00000000..9d265c07 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/Stream.java @@ -0,0 +1,107 @@ +package eu.fbk.mpba.sensorflow.sense; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import eu.fbk.mpba.sensorflow.Input; +import eu.fbk.mpba.sensorflow.InputGroup; + +public class Stream extends Input { + + public static final List HEADER_WXYZ = Arrays.asList("w", "x", "y", "z"); + public static final List HEADER_XYZ = Arrays.asList("x", "y", "z"); + public static final List HEADER_VALUE = Collections.singletonList("value"); + public static final List HEADER_EMPTY = Collections.emptyList(); + + public static final String NAME_ACCELERATION = "Acceleration"; + public static final String NAME_TEMPERATURE = "Temperature"; + public static final String NAME_LOCATION = "Locaation"; + public static final String NAME_BVP = "BloodVolumePulse"; + public static final String NAME_EDA = "ElectroDermalActivity"; + public static final String NAME_ECG = "ElectroCardioGram"; + + public Stream(Collection header, String name) { + this(null, header, name, false); + } + + public Stream(Collection header, String name, boolean reactive) { + this(null, header, name, reactive); + } + + public Stream(InputGroup parent, Collection header, String name) { + this(parent, header, name, false); + } + + public Stream(InputGroup parent, Collection header, String name, boolean reactive) { + super(parent, name, header, reactive); + } + + public void pushValue(double value) { + super.pushValue(getTimeSource().getMonoUTCNanos(), new double[] { value }); + } + + /** + * Logs the quality of the signal with the synchronous time reference. + * @param timestamp Time reference to the signal. + * @param quality InputModule-dependent codification of the quality. + */ + public void pushQuality(long timestamp, String quality) { + pushLog(timestamp, 0, "quality", quality); + } + + /** + * Logs now the quality of the signal with the synchronous time reference. + * @param quality InputModule-dependent codification of the quality. + */ + public void pushQuality(String quality) { + pushLog(getTimeSource().getMonoUTCNanos(), 0, "quality", quality); + } + + /** + * Formatted log. + * @param timestamp Time reference of this log line. + * @param type Identification code of the log type + * @param tag Tag for the log, can be seen as a sub-type or can be ignored. + * @param message String containing the log message + */ + public void pushLog(long timestamp, int type, String tag, String message) { + // URL escape just the ':' char + super.pushLog(timestamp, + LogMessage.format(type, tag, message) + ); + } + + /** + * Raw log now. + * @param message String containing the raw log text + */ + public void pushLog(String message) { + pushLog(getTimeSource().getMonoUTCNanos(), message); + } + + /** + * Formatted log now. + * @param type Identification code of the log type + * @param tag Tag for the log, can be seen as a sub-type or can be ignored. + * @param message String containing the log message + */ + public void pushLog(int type, String tag, String message) { + pushLog(getTimeSource().getMonoUTCNanos(), type, tag, message); + } + + // May not be for the end developer + + @Override + public void onCreate() { } + + @Override + public void onAdded() { } + + @Override + public void onRemoved() { } + + @Override + public void onClose() { } +} diff --git a/src/main/java/eu/fbk/mpba/sensorflow/sense/WirelessDevice.java b/src/main/java/eu/fbk/mpba/sensorflow/sense/WirelessDevice.java new file mode 100644 index 00000000..ef6a7bf2 --- /dev/null +++ b/src/main/java/eu/fbk/mpba/sensorflow/sense/WirelessDevice.java @@ -0,0 +1,129 @@ +package eu.fbk.mpba.sensorflow.sense; + + +/** + * WirelessDevice enables by extension interacting with devices that have a battery, a wireless + * connection and may lose data. + * + * To notify such data, methods of the form protected boolean pushX() are provided. These control + * internal Flows that send the data in a standard form. + * + * Remember to implement the following static method: + * public static DeviceDetector getDeviceDetector() + */ +public abstract class WirelessDevice extends InputModule { + + private static ConnectionType connectionType = ConnectionType.OTHER; + private String deviceName; + + public ConnectionType getConnectionType() { + return connectionType; + } + + // Control of built-in flows + + /** + * Notifies a change in the remaining battery time (in seconds). Should be called at least once to notify that + * the device has a battery. + * @param seconds Estimated remaining time before power off due to low battery. This time, + * despite being in seconds, has neither to change every second nor to be monotonic. + */ + public void pushBatteryETA(double seconds) { + batteryETA.pushValue(seconds); + } + + /** + * Notifies a change in the battery State Of Charge (SOC %). Should be called at least once to notify that + * the device has a battery. + * @param percentage Value from 0 (Empty) to 100 Fully charged. Neither granularity, nor + * monotonicity requirements. + */ + public void pushBatterySOC(double percentage) { + batterySOC.pushValue(percentage); + } + + public void pushBatterySOC(double volts, double fullVoltage, double emptyVoltage) { + pushBatterySOC((volts - emptyVoltage) / (fullVoltage - emptyVoltage) * 100); + } + + /** + * Notifies a change in the status of the datalink connection. + */ + public void pushConnectionStatus(ConnectionStatus status) { + connection.pushLog(status.name()); + } + + /** + * Notifies a change in the physical wireless connection strength in percentage. + * The reference for the value is 0 (Unable to connect) to 100 (Excellent). + * @param percentage Percentage linearly indicating the quality or power of the connection + * where 0% is the complete loss of connection and 100% is the maximal + * power receivable. + */ + public void pushConnectionStrength(double percentage) { + connection.pushValue(percentage); + } + + /** + * Notifies a loss of data in the connection. + * @param bytes Indicative quantity of information lost. If the data is retransmitted due to + * some Transmission Control mechanism it has not to be considered lost. + */ + public void pushDataLoss(double bytes) { + dataLoss.pushValue(bytes); + } + + /** + * Called by the developer who uses this module to connect the device. The output flows from + * onConnectionStatus. + */ + public abstract void connect(Object device); + + /** + * Called by the developer who uses this module to disconnect the device. The output flows from + * onConnectionStatus. + */ + public void disconnect() { /* FIXME: abstract me */ } + + // OOP stuff + + private Stream batteryETA = new Stream(this, Stream.HEADER_VALUE, "battery-eta", true); + private Stream batterySOC = new Stream(this, Stream.HEADER_VALUE, "battery-soc", true); + private Stream connection = new Stream(this, Stream.HEADER_VALUE, "connection", true); + private Stream dataLoss = new Stream(this, Stream.HEADER_VALUE, "data-loss"); + { + addStream(batteryETA); + addStream(batterySOC); + addStream(connection); + addStream(dataLoss); + } + + public String getDeviceName() { + return deviceName; + } + + protected void setDeviceName(String deviceName) { + this.deviceName = deviceName; + } + + public enum ConnectionType { + BLUETOOTH_2, + BLUETOOTH_3, + BLUETOOTH_4, + WIFI, + OTHER + } + + public enum ConnectionStatus { + ACTION_NOT_SUPPORTED, + CONNECTED, + CONNECTING, + CONNECTION_DROPPED_NO_ACTION, + CONNECTION_DROPPED_RECONNECTING, + CONNECTION_FAILED, + DESTINATION_NOT_AVAILABLE, + DISCONNECTED, + WRONG_DESTINATION_TYPE, + NOT_SPECIFIED, + } +} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/FlowsMan.java b/src/main/java/eu/fbk/mpba/sensorsflows/FlowsMan.java deleted file mode 100644 index ceb7d524..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/FlowsMan.java +++ /dev/null @@ -1,599 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import eu.fbk.mpba.sensorsflows.base.DeviceStatus; -import eu.fbk.mpba.sensorsflows.base.EngineStatus; -import eu.fbk.mpba.sensorsflows.base.EventCallback; -import eu.fbk.mpba.sensorsflows.base.IDeviceCallback; -import eu.fbk.mpba.sensorsflows.base.IOutput; -import eu.fbk.mpba.sensorsflows.base.IOutputCallback; -import eu.fbk.mpba.sensorsflows.base.ISensorDataCallback; -import eu.fbk.mpba.sensorsflows.base.IUserInterface; -import eu.fbk.mpba.sensorsflows.base.OutputStatus; -import eu.fbk.mpba.sensorsflows.base.SensorStatus; - -/** - * FlowsMan is the class that represents the engine of the library. - * This is the only interface that the user should use. - * Implementation of the IUserInterface - * @param The type of the timestamp returned by the outputs (must be the same for every item). - * @param The type of the value returned by the devices (must be the same for every item). - */ -public class FlowsMan implements - IUserInterface, SensorComponent, OutputPlugin>, - IDeviceCallback>, - ISensorDataCallback, TimeT, ValueT>, - IOutputCallback { - - // Status Interface - - /** - * Not for the end-user. - * - * @param sender sender - * @param state arg - */ - @Override - public void deviceStatusChanged(NodeDecorator sender, DeviceStatus state) { - if (state == DeviceStatus.INITIALIZED) { - synchronized (_itemsToInitLock) { - if (_devicesToInit.contains(sender)) { - _devicesToInit.remove(sender); - if (_status == EngineStatus.PREPARING && _devicesToInit.isEmpty()) { - // POI Change point - _devicesToInit = null; - } - } - } - if (_outputsToInit == null) - // FIXME WARN User-code timestamp dependency in the output thread or son - changeStatus(EngineStatus.STREAMING); - } - // TODO 7 Manage the other states - } - - /** - * Not for the end-user. - * - * @param sender sender - * @param state arg - */ - @Override - public void outputStatusChanged(IOutput sender, OutputStatus state) { - if (state == OutputStatus.INITIALIZED) { - synchronized (_itemsToInitLock) { - if (_outputsToInit.contains(sender)) { - _outputsToInit.remove(sender); - if (_status == EngineStatus.PREPARING && _outputsToInit.isEmpty()) { - // POI Change point - _outputsToInit = null; - } - } - } - if (_devicesToInit == null) - // FIXME WARN User-code timestamp dependency in the output thread or son - changeStatus(EngineStatus.STREAMING); - } - // TODO 7 Manage the other states - } - - /** - * Not for the end-user. - * - * @param sender sender - * @param state arg - */ - @Override - public void sensorStatusChanged(SensorComponent sender, TimeT time, SensorStatus state) { - // TODO 3 Implement an 'internal device' with an 'internal sensor' for log utilities. - // The sensor has to send also an event on a status change. - } - - // Data and Events Interface - - /** - * Not for the end-user. - * The sensor calls this when it has a new value. - * - * @param sender sender - * @param time timestamp - * @param value value - */ - @Override - public void sensorValue(SensorComponent sender, TimeT time, ValueT value) { - if (sender.isListened() && !_paused) { - for (OutputDecorator o : sender.getOutputs()) { - if (o.isEnabled()) - //noinspection unchecked - o.sensorValue(sender, time, value); - } - } - } - - /** - * Not for the end-user. - * - * @param sender sender - * @param type event code - * @param message message text - */ - @Override - public void sensorEvent(SensorComponent sender, TimeT time, int type, String message) { - if (sender.isListened() && !_paused) { - for (OutputDecorator o : sender.getOutputs()) { - if (o.isEnabled()) - //noinspection unchecked - o.sensorEvent(sender, time, type, message); - } - } - } - - // no deviceEvent - // no outputEvent for now - - // Fields - - final String _emAlreadyRendered = "The engine is initialized. No inputs, outputs or links can be added now."; - final String _itemsToInitLock = "_itemsToInitLock"; - - private LinkMode _linkMode = LinkMode.PRODUCT; - private String sessionTag = ""; - protected EngineStatus _status = EngineStatus.STANDBY; - protected boolean _paused = false; - - // maybe key, value - - protected Map> _userDevices = new TreeMap<>(); - protected Map> _userOutputs = new TreeMap<>(); - - protected List _devicesToInit = new ArrayList<>(); // null - protected List _outputsToInit = new ArrayList<>(); // null - - protected EventCallback, SensorComponent, OutputPlugin> - , EngineStatus> _onStatusChanged = null; // null - protected EventCallback, DeviceStatus> _onDeviceStatusChanged = null; // null - protected EventCallback, OutputStatus> _onOutputStatusChanged = null; // null - - // Engine implementation - - /** - * Default constructor. - */ - public FlowsMan() { - changeStatus(EngineStatus.STANDBY); - } - - public String getSessionTag() { - return sessionTag; - } - - public void setSessionTag(String sessionTag) { - if (_status == EngineStatus.STANDBY) { - this.sessionTag = sessionTag; - } else - throw new UnsupportedOperationException(_emAlreadyRendered); - } - - // STANDBY inputs (proper) - - /** - * Adds a device to the enumeration, this is to be used before the {@code start} call, before the internal IO-map rendering. - * - * @param node Device to add. - */ - @Override - public void addInput(NodePlugin node) { - if (_status == EngineStatus.STANDBY) { - // Check if only the name is already contained - if (!_userDevices.containsKey(node.getName())) { - _userDevices.put(node.getName(), new NodeDecorator<>(node, this)); - for (SensorComponent s : node.getSensors()) - s.registerManager(this); - } - } else - throw new UnsupportedOperationException(_emAlreadyRendered); - } - - @Override - public NodePlugin getInput(String name) { - Object r = _userDevices.get(name); - //noinspection unchecked - return r == null ? null : ((NodeDecorator)r).getPlugIn(); - } - - /** - * Adds a link between a sensor and an output (N to M relation) before the {@code start} call. - * - * @param fromSensor Input sensor retreived from a device. - * @param toOutput Output channel. - */ - @Override - public void addLink(SensorComponent fromSensor, OutputPlugin toOutput) { - // Manual indexOf for performance - for (OutputDecorator outMan : _userOutputs.values()) - if (toOutput == outMan.getPlugIn()) { // for reference, safe - addLink(fromSensor, outMan); - break; - } - } - - @Override - public void setOutputEnabled(boolean enabled, String name) { - if (_userOutputs.containsKey(name)) { - ((OutputDecorator)_userOutputs.get(name)).setEnabled(enabled); - } - } - - @Override - public boolean getOutputEnabled(String name) { - return _userOutputs.containsKey(name) && ((OutputDecorator)_userOutputs.get(name)).isEnabled(); - } - - /** - * Adds a link between a sensor and an output-decorator object (N to M relation) before the {@code start} call. - * - * @param fromSensor Input sensor retrieved from a device. - * @param outMan OutputDecorator object. - */ - void addLink(SensorComponent fromSensor, OutputDecorator outMan) { - if (_status == EngineStatus.STANDBY) { - fromSensor.addOutput(outMan); - outMan.addSensor(fromSensor); - } else - throw new UnsupportedOperationException(_emAlreadyRendered); - } - - /** - * Adds an output to the enumeration, this is to be used before the {@code start} call, before the internal in-out map rendering. - * - * @param output Output to add. - */ - @Override - public void addOutput(OutputPlugin output) { - if (_status == EngineStatus.STANDBY) { - // Check if only the name is already contained - if (!_userOutputs.containsKey(output.getName())) - _userOutputs.put(output.getName(), new OutputDecorator<>(output, this)); - } else - throw new UnsupportedOperationException(_emAlreadyRendered); - } - - @Override - public OutputPlugin getOutput(String name) { - Object r = _userOutputs.get(name); - //noinspection unchecked - return r == null ? null : ((OutputDecorator)r).getPlugIn(); - } - - // STANDBY aux gets (proper) - - /** - * Enumerates every Device managed. - * - * @return Enumerator usable trough a for (INode d : enumerator) - */ - @Override - public Iterable> getDevices() { - return new Iterable>() { - @Override - public Iterator> iterator() { - final Iterator> i = _userDevices.values().iterator(); - return new Iterator>() { - - @Override - public boolean hasNext() { - return i.hasNext(); - } - - @Override - public NodePlugin next() { - return i.next().getPlugIn(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Cannot remove objects from here."); - } - }; - } - }; - } - - /** - * Enumerates every Output managed. - * - * @return Enumerator usable trough a for (IOutput o : enumerator) - */ - @Override - public Iterable> getOutputs() { - return new Iterable>() { - @Override - public Iterator> iterator() { - final Iterator> i = _userOutputs.values().iterator(); - return new Iterator>() { - - @Override - public boolean hasNext() { - return i.hasNext(); - } - - @Override - public OutputPlugin next() { - return i.next().getPlugIn(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Cannot remove objects from here."); - } - }; - } - }; - } - - // Internal init and final management - - /** - * This method allows to initialize the device before the {@code start} call. - * Made private - * - * @param device {@code INode} to initializeNode - */ - void initialize(NodeDecorator device) { - // The connection state is checked before the start end callback. - //noinspection StatementWithEmptyBody - if (/*_decDevices.contains(device) && */device.getStatus() == DeviceStatus.NOT_INITIALIZED) { - device.initializeNode(); - } else { -// Log.w(LOG_TAG, "INode not NOT_INITIALIZED: " + device.toString()); - } - } - - /** - * This method allows to initializeNode the device before the {@code start} call. - * Made private - * - * @param output {@code IOutput} to finalize. - */ - void initialize(OutputDecorator output, String sessionName) { - //noinspection StatementWithEmptyBody - if (/*_decOutputs.contains(output) && */output.getStatus() == OutputStatus.NOT_INITIALIZED) { - output.initializeOutput(sessionName); - } else { -// Log.w(LOG_TAG, "IOutput not NOT_INITIALIZED: " + output.toString()); - } - } - - /** - * This method allows to finalize the device before the {@code close} call. - * Made private - * - * @param device {@code INode} to finalize. - */ - void finalize(NodeDecorator device) { - // The connection state is not checked - //noinspection StatementWithEmptyBody - if (/*_decDevices.contains(device) && */device.getStatus() == DeviceStatus.INITIALIZED) { - device.finalizeNode(); - } else { -// Log.w(LOG_TAG, "INode not INITIALIZED: " + device.toString()); - } - } - - /** - * This method allows to finalize the device before the {@code close} call. - * Made private - * - * @param output {@code IOutput} to finalize. - */ - void finalize(IOutput output) { - //noinspection StatementWithEmptyBody - if (/*_decOutputs.contains(output) && */output.getStatus() == OutputStatus.INITIALIZED) { - output.finalizeOutput(); - } else { -// Log.w(LOG_TAG, "IOutput not INITIALIZED: " + output.toString()); - } - } - - // Engine operation - - public void setLinkMode(LinkMode mode) { - if (_status == EngineStatus.STANDBY) - _linkMode = mode; - else - throw new UnsupportedOperationException(_emAlreadyRendered); - } - - /** - * Renders the IO-mapping and in two times (async.) initializes the devices and the outputs. - *

- * If a device/output was initialized before this call and it is not already INITIALIZED the - * engine will wait for it for an indefinite timestamp. In this period the engine status will stay - * {@code EngineStatus.PREPARING}. - *

- * The session name is the date-timestamp string {@code Long.toString(System.currentTimeMillis())} - * if the sessionTag has not been set. - */ - @SuppressWarnings("JavaDoc") - @Override - public void start() { - if (sessionTag == null || sessionTag.length() == 0) - sessionTag = Long.toString(System.currentTimeMillis()); - start(sessionTag); - } - - /** - * Renders the IO-mapping and in two times (async.) initializes the devices and the outputs. - *
- * If a device/output was initialized before this call and it is not already INITIALIZED the - * engine will wait for it for an indefinite time. In this period the engine status will stay - * {@code EngineStatus.PREPARING}. - *

- * Allows to give a name to the current session but it DOES NOT CHECK if it already exists. - */ - public void start(String sessionName) { - if (getStatus() == EngineStatus.STANDBY - || getStatus() == EngineStatus.CLOSED) { - // Prepares the links - switch (_linkMode) { - case PRODUCT: - // SENSORS x OUTPUTS - for (NodeDecorator d : _userDevices.values()) - for (SensorComponent s : d.getSensors()) // FOREACH SENSOR - for (OutputDecorator o : _userOutputs.values()) // LINK TO EACH OUTPUT - addLink(s, o); - break; - case NTH_TO_NTH: - // max SENSORS, OUTPUTS - int maxi = Math.max(_userDevices.size(), _userOutputs.size()); - for (int i = 0; i < maxi; i++) // FOREACH OF THE LONGEST - for (SensorComponent s : new ArrayList<>(_userDevices.values()).get(i % _userDevices.size()).getSensors()) // LINK MODULE LOOPING ON THE SHORTEST - addLink(s, new ArrayList<>(_userOutputs.values()).get(i % _userOutputs.size())); - break; - } - changeStatus(EngineStatus.PREPARING); - _devicesToInit.addAll(_userDevices.values()); - // Launches the initializations - for (NodeDecorator d : _userDevices.values()) { - // only if NOT_INITIALIZED: checked in the initializeNode method - initialize(d); - } - _outputsToInit.addAll(_userOutputs.values()); - for (OutputDecorator o : _userOutputs.values()) { - // only if NOT_INITIALIZED: checked in the initializeNode method - initialize(o, sessionName); - } - // WAS _outputsSensors.clear(); - // WAS _outputsSensors = null; - } else - throw new UnsupportedOperationException("Engine already running!"); - } - - /** - * Returns weather the global streaming is paused. - * - * @return Boolean value. - */ - @Override - public boolean isPaused() { - return _paused; - } - - /** - * Allows to pause or to resume the streaming in the faster way. - * - * @param paused Boolean value. - */ - @Override - public void setPaused(boolean paused) { - _paused = paused; - } - - protected void changeStatus(EngineStatus status) { - _status = status; - if (_onStatusChanged != null) - _onStatusChanged.handle(this, _status); - } - - /** - * Gets the status of the engine. - * - * @return The actual status of the engine. - */ - @Override - public EngineStatus getStatus() { - return _status; - } - - /** - * This method finalizes every device and every output and waits the queues to get empty. - */ - @Override - public void stop() { - changeStatus(EngineStatus.FINALIZING); - for (NodeDecorator d : _userDevices.values()) { - // only if INITIALIZED: checked in the method - finalize(d); - } - for (IOutput o : _userOutputs.values()) { - // only if INITIALIZED: checked in the method - finalize(o); - } - changeStatus(EngineStatus.FINALIZED); - } - - /** - * - */ - @Override - public void close() { - switch (getStatus()) { - case STANDBY: - break; - case STREAMING: - stop(); - case FINALIZED: - changeStatus(EngineStatus.CLOSING); - for (NodeDecorator d : _userDevices.values()) - for (SensorComponent s : d.getPlugIn().getSensors()) - s.close(); - for (IOutput o : _userOutputs.values()) - o.close(); - changeStatus(EngineStatus.CLOSED); - break; - case CLOSED: - break; - default: - throw new UnsupportedOperationException( - "Another operation is currently trying to chenge the state: " + - getStatus().toString()); - } - } - - /** - * Finalizes the object calling also the {@code close} method. - * - * @throws Throwable - */ - @Override - protected void finalize() throws Throwable { - close(); - // After, Object.finalize() - super.finalize(); - } - - /** - * Sets a listener to receive the engine state changes. - * - * @param callback Callback to call when the engine state changes. - */ - @Override - public void setOnStatusChanged(EventCallback, SensorComponent, OutputPlugin>, EngineStatus> callback) { - _onStatusChanged = callback; - } - - /** - * Sets a listener to receive every device's state change. - * - * @param callback Callback to call when any device's state changes. - */ - @Override - public void setOnDeviceStatusChanged(EventCallback, DeviceStatus> callback) { - _onDeviceStatusChanged = callback; - } - - /** - * Sets a listener to receive every output's state change. - * - * @param callback Callback to call when any device's state changes. - */ - @Override - public void setOnOutputStatusChanged(EventCallback, OutputStatus> callback) { - _onOutputStatusChanged = callback; - } -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/LinkMode.java b/src/main/java/eu/fbk/mpba/sensorsflows/LinkMode.java deleted file mode 100644 index 8b0bb915..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/LinkMode.java +++ /dev/null @@ -1,17 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -public enum LinkMode { - /** - * Uses the links specified through the {@code addLink} method. - */ - MANUAL, - /** - * Links each sensor to each output. - */ - PRODUCT, - /** - * Links each nth element of the longest collection between sensors and outputs to the nth - * element of the other. When the length is not the same the modulo operation is used. - */ - NTH_TO_NTH -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/NodeDecorator.java b/src/main/java/eu/fbk/mpba/sensorsflows/NodeDecorator.java deleted file mode 100644 index a2cbcad0..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/NodeDecorator.java +++ /dev/null @@ -1,57 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import eu.fbk.mpba.sensorsflows.base.DeviceStatus; -import eu.fbk.mpba.sensorsflows.base.INode; - -/** - * This class adds internal support for the library data-paths. - */ -class NodeDecorator implements INode> { - private FlowsMan _manager = null; - private DeviceStatus _status = DeviceStatus.NOT_INITIALIZED; - private NodePlugin _nodePlugin; - - NodeDecorator(NodePlugin nodePlugin, FlowsMan manager) { - _nodePlugin = nodePlugin; - _manager = manager; - } - - protected void changeStatus(DeviceStatus s) { - if (_manager != null) - _manager.deviceStatusChanged(this, _status = s); - } - - @Override - public void initializeNode() { - changeStatus(DeviceStatus.INITIALIZING); - _nodePlugin.inputPluginStart(); - changeStatus(DeviceStatus.INITIALIZED); - } - - @Override - public Iterable> getSensors() { - return _nodePlugin.getSensors(); - } - - @Override - public void finalizeNode() { - changeStatus(DeviceStatus.FINALIZING); - _nodePlugin.inputPluginStop(); - changeStatus(DeviceStatus.FINALIZED); - } - - // Getters - - @Override - public DeviceStatus getStatus() { - return _status; - } - - FlowsMan getManager() { - return _manager; - } - - NodePlugin getPlugIn() { - return _nodePlugin; - } -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/NodePlugin.java b/src/main/java/eu/fbk/mpba/sensorsflows/NodePlugin.java deleted file mode 100644 index 7b2e52da..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/NodePlugin.java +++ /dev/null @@ -1,12 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import eu.fbk.mpba.sensorsflows.base.IPlugin; - -public interface NodePlugin extends IPlugin { - - void inputPluginStart(); - - void inputPluginStop(); - - Iterable> getSensors(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/OutputDecorator.java b/src/main/java/eu/fbk/mpba/sensorsflows/OutputDecorator.java deleted file mode 100644 index da87ad79..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/OutputDecorator.java +++ /dev/null @@ -1,166 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; - -import eu.fbk.mpba.sensorsflows.base.IOutput; -import eu.fbk.mpba.sensorsflows.base.IOutputCallback; -import eu.fbk.mpba.sensorsflows.base.ISensor; -import eu.fbk.mpba.sensorsflows.base.OutputStatus; -import eu.fbk.mpba.sensorsflows.base.SensorDataEntry; -import eu.fbk.mpba.sensorsflows.base.SensorEventEntry; -import eu.fbk.mpba.sensorsflows.base.SensorStatus; - -/** - * This class adds internal support for the library data-paths. - * Polls but has a fixed sleep time in the case that each queue is empty. - */ -class OutputDecorator implements IOutput { - private IOutputCallback _manager = null; - - private boolean _stopPending = false; - private OutputStatus _status = OutputStatus.NOT_INITIALIZED; - private Object sessionTag = "unspecified"; - private OutputPlugin outputPlugIn; - private List linkedSensors; - - private ArrayBlockingQueue> _eventsQueue; - private ArrayBlockingQueue> _dataQueue; - private boolean enabled = true; - - protected OutputDecorator(OutputPlugin output, IOutputCallback manager) { - _manager = manager; - linkedSensors = new ArrayList<>(); - outputPlugIn = output; - int dataQueueCapacity = 100; - int eventsQueueCapacity = 50; - // TODO POI Adjust the capacity - _eventsQueue = new ArrayBlockingQueue<>(eventsQueueCapacity); - // TODO POI Adjust the capacity - _dataQueue = new ArrayBlockingQueue<>(dataQueueCapacity); - } - - private Thread _thread = new Thread(new Runnable() { - @Override - public void run() { - outputPlugIn.outputPluginStart(sessionTag, linkedSensors); - changeStatus(OutputStatus.INITIALIZED); - dispatchLoopWhileNotStopPending(); - outputPlugIn.outputPluginStop(); - changeStatus(OutputStatus.FINALIZED); - } - }); - - private void dispatchLoopWhileNotStopPending() { - SensorDataEntry data; - SensorEventEntry event; - while (true) { - data = _dataQueue.poll(); - event = _eventsQueue.poll(); - if (data != null) - outputPlugIn.newSensorData(data); - if (event != null) - outputPlugIn.newSensorEvent(event); - else if (data == null) - if (_stopPending) - break; - else - try { - long sleepInterval = 50; // TODO POI polling timestamp here - Thread.sleep(sleepInterval); - } catch (InterruptedException e) { -// Log.w(LOG_TAG, "InterruptedException in OutputImpl.run() find-me:fnh294he97"); - } - } - } - - private void changeStatus(OutputStatus s) { - if (_manager != null) - _manager.outputStatusChanged(this, _status = s); - } - - // Implemented Callbacks - - @Override - public void initializeOutput(Object sessionTag) { - this.sessionTag = sessionTag; - changeStatus(OutputStatus.INITIALIZING); - // outputPlugIn.outputPluginStart(...) in _thread - _thread.start(); - } - - @Override - public void finalizeOutput() { - changeStatus(OutputStatus.FINALIZING); - _stopPending = true; - try { - _thread.join(); // FIXME POI Indefinite wait - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public void sensorStatusChanged(ISensor sensor, TimeT time, SensorStatus state) { - } - - @Override - public void sensorEvent(ISensor sensor, TimeT time, int type, String message) { - try { - // FIXME WARN Locks the sensor's thread - _eventsQueue.put(new SensorEventEntry<>(sensor, time, type, message)); - } catch (InterruptedException e) { -// Log.w(LOG_TAG, "InterruptedException in OutputImpl.sensorEvent() find-me:924nj89f8j2"); - } - } - - @Override - public void sensorValue(ISensor sensor, TimeT time, ValueT value) { - try { - // FIXME WARN Locks the sensor's thread - SensorDataEntry a = new SensorDataEntry<>(sensor, time, value); - _dataQueue.put(a); - } catch (InterruptedException e) { -// Log.w(LOG_TAG, "InterruptedException in OutputImpl.sensorValue() find-me:24bhi5ti89"); - } - } - - // Setters - - public void addSensor(ISensor s) { - linkedSensors.add(s); - } - - // Getters - - @Override - public OutputStatus getStatus() { - return _status; - } - - OutputPlugin getPlugIn() { - return outputPlugIn; - } - - /** - * Unregisters every sensor linked - */ - public void close() { - linkedSensors.clear(); - } - - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public boolean isEnabled() { - return enabled; - } -} \ No newline at end of file diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/OutputPlugin.java b/src/main/java/eu/fbk/mpba/sensorsflows/OutputPlugin.java deleted file mode 100644 index cc8f0fa3..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/OutputPlugin.java +++ /dev/null @@ -1,19 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import java.util.List; - -import eu.fbk.mpba.sensorsflows.base.ISensor; -import eu.fbk.mpba.sensorsflows.base.IPlugin; -import eu.fbk.mpba.sensorsflows.base.SensorDataEntry; -import eu.fbk.mpba.sensorsflows.base.SensorEventEntry; - -public interface OutputPlugin extends IPlugin { - - void outputPluginStart(Object sessionTag, List streamingSensors); - - void outputPluginStop(); - - void newSensorEvent(SensorEventEntry event); - - void newSensorData(SensorDataEntry data); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/SensorComponent.java b/src/main/java/eu/fbk/mpba/sensorsflows/SensorComponent.java deleted file mode 100644 index 169740ba..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/SensorComponent.java +++ /dev/null @@ -1,149 +0,0 @@ -package eu.fbk.mpba.sensorsflows; - -import java.util.ArrayList; -import java.util.List; - -import eu.fbk.mpba.sensorsflows.base.IMonoTimestampSource; -import eu.fbk.mpba.sensorsflows.base.ISensor; -import eu.fbk.mpba.sensorsflows.base.ISensorDataCallback; -import eu.fbk.mpba.sensorsflows.base.SensorStatus; -import eu.fbk.mpba.sensorsflows.util.ReadOnlyIterable; - -/** - * This class adds internal support for the library data-paths. - */ -public abstract class SensorComponent implements ISensor { - protected NodePlugin _parent = null; - protected List, TimeT, ValueT>> _handler = new ArrayList<>(); - protected ArrayList> _outputs = new ArrayList<>(); - - protected SensorComponent(NodePlugin parent) { - _parent = parent; - } - - private int mForwardedMessages = 0; - private boolean mListened = true; - protected SensorStatus mStatus = SensorStatus.OFF; - - private static long _bootTime = System.currentTimeMillis() * 1_000_000L - System.nanoTime(); - - private static IMonoTimestampSource _time = new IMonoTimestampSource() { - - @Override - public long getMonoUTCNanos() { - return System.nanoTime() + _bootTime; - } - - @Override - public long getMonoUTCNanos(long realTimeNanos) { - return realTimeNanos + _bootTime; - } - - @Override - public long getMonoUTCMillis() { - return getMonoUTCNanos() / 1_000_000; - } - - @Override - public long getMonoUTCMillis(long realTimeNanos) { - return getMonoUTCNanos(realTimeNanos) / 1_000_000; - } - }; - - void addOutput(OutputDecorator _output) { - _outputs.add(_output); - } - - void registerManager(ISensorDataCallback, TimeT, ValueT> man) { - _handler.add(man); - } - - void unregisterManager(ISensorDataCallback, TimeT, ValueT> man) { - _handler.remove(man); - } - - Iterable> getOutputs() { - return new ReadOnlyIterable<>(_outputs.iterator()); - } - - // Managed protected getters setters - - protected void changeStatus(SensorStatus state) { - for (ISensorDataCallback, TimeT, ValueT> i : _handler) { -// if (i instanceof FlowsMan && ((FlowsMan)i).getStatus() == EngineStatus.CLOSED) -// _handler.remove(i); - i.sensorStatusChanged(this, null, mStatus = state); - } - } - - /** - * Unregisters every outputDecorator - */ - public void close() { - _handler.clear(); - _outputs.clear(); - } - - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - // Managed Overrides - - public NodePlugin getParentDevicePlugin() { - return _parent; - } - - @Override - public SensorStatus getStatus() { - return mStatus; - } - - public String getName() { - return this.getClass().getSimpleName(); - } - - public static IMonoTimestampSource getSTime() { - return _time; - } - - public IMonoTimestampSource getTime() { - return _time; - } - - // Notify methods - - public void sensorValue(TimeT time, ValueT value) { - for (ISensorDataCallback, TimeT, ValueT> i : _handler) { -// if (i instanceof FlowsMan && ((FlowsMan)i).getStatus() == EngineStatus.CLOSED) -// _handler.remove(i); - i.sensorValue(this, time, value); - } - mForwardedMessages++; - } - - public void sensorEvent(TimeT time, int type, String message) { - for (ISensorDataCallback, TimeT, ValueT> i : _handler) { -// if (i instanceof FlowsMan && ((FlowsMan)i).getStatus() == EngineStatus.CLOSED) -// _handler.remove(i); - i.sensorEvent(this, time, type, message); - } - mForwardedMessages++; - } - - // Listenage - - public boolean isListened() { - return mListened; - } - - public void setListened(boolean listened) { - this.mListened = listened; - } - - // To implement - - public abstract List getValueDescriptor(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/DeviceStatus.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/DeviceStatus.java deleted file mode 100644 index 8d4d1853..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/DeviceStatus.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public enum DeviceStatus { - NOT_INITIALIZED, INITIALIZING, INITIALIZED, FINALIZING, FINALIZED -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/EngineStatus.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/EngineStatus.java deleted file mode 100644 index 80614d98..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/EngineStatus.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public enum EngineStatus { - STANDBY, PREPARING, STREAMING, FINALIZING, FINALIZED, CLOSING, CLOSED -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/EventCallback.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/EventCallback.java deleted file mode 100644 index cf6b9c9e..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/EventCallback.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public interface EventCallback { - void handle(T sender, A state); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IDeviceCallback.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IDeviceCallback.java deleted file mode 100644 index 74df9ffe..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IDeviceCallback.java +++ /dev/null @@ -1,11 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -/** - * Main interface for the device's data transport. - * - * @param the desired type of device: it should be at least an INode. - */ -public interface IDeviceCallback { - // TODO (#is never used) check why it is not used - void deviceStatusChanged(DeviceT device, DeviceStatus state); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IMonoTimestampSource.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IMonoTimestampSource.java deleted file mode 100644 index 6c5c58fd..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IMonoTimestampSource.java +++ /dev/null @@ -1,11 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -/** - * Gives support to keep a monotonic timestamp reference - */ -public interface IMonoTimestampSource { - long getMonoUTCNanos(); - long getMonoUTCNanos(long realTimeNanos); - long getMonoUTCMillis(); - long getMonoUTCMillis(long realTimeNanos); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/INode.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/INode.java deleted file mode 100644 index 74b58cf7..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/INode.java +++ /dev/null @@ -1,17 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -/** - * Main interface for a device. - * - * The user should access to these methods only to have a higher control of the operation. - */ -public interface INode { - - void initializeNode(); - - Iterable getSensors(); - - DeviceStatus getStatus(); - - void finalizeNode(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutput.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutput.java deleted file mode 100644 index 102fab30..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutput.java +++ /dev/null @@ -1,16 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -/** - * Main interface for the data management. - * TimeT and ValueT must be the same for the whole library. - */ -public interface IOutput extends ISensorDataCallback { - - void initializeOutput(Object sessionTag); - - OutputStatus getStatus(); - - void finalizeOutput(); - - void close(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutputCallback.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutputCallback.java deleted file mode 100644 index 2447a70f..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IOutputCallback.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public interface IOutputCallback { - void outputStatusChanged(IOutput sender, OutputStatus status); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IPlugin.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IPlugin.java deleted file mode 100644 index b8f55acd..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IPlugin.java +++ /dev/null @@ -1,6 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public interface IPlugin { - String getName(); - void close(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensor.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensor.java deleted file mode 100644 index 711d8469..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensor.java +++ /dev/null @@ -1,27 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -import java.util.List; - -import eu.fbk.mpba.sensorsflows.NodePlugin; - -/** - * Main control interface for a sensor. - * - * The user should access to these methods only to have a higher control of the operation. - */ -public interface ISensor { - - void switchOnAsync(); - - void switchOffAsync(); - - SensorStatus getStatus(); - - NodePlugin getParentDevicePlugin(); - - List getValueDescriptor(); - - String getName(); - - IMonoTimestampSource getTime(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensorDataCallback.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensorDataCallback.java deleted file mode 100644 index 23757e6e..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/ISensorDataCallback.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -/** - * Main interface for a sensor's data and state callback. - * Multiple sensors call this so the sender parameter is the sender sensor. - * The receiver should implement this. - */ -public interface ISensorDataCallback { - - void sensorStatusChanged(SensorT sensor, TimeT time, SensorStatus state); - - void sensorEvent(SensorT sensor, TimeT time, int type, String message); - - void sensorValue(SensorT sensor, TimeT time, ValueT value); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/IUserInterface.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/IUserInterface.java deleted file mode 100644 index 5b0a6f89..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/IUserInterface.java +++ /dev/null @@ -1,58 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -import eu.fbk.mpba.sensorsflows.OutputPlugin; - -/** - * Main interface for the flows control. - * - * The instance is used by the user! - * - * The user can control the enumeration of the devices and the outputs and the links between - * these, their operation and the operation of the engine. - */ -@SuppressWarnings("UnusedDeclaration") -public interface IUserInterface { - - - // ITEMS ENUMERATION control part - - void addInput(DeviceT device); - - DeviceT getInput(String name); - - Iterable getDevices(); - - void addOutput(OutputT output); - - OutputT getOutput(String name); - - Iterable getOutputs(); - - void addLink(SensorT fromSensor, OutputT toOutput); - - // ITEMS OPERATION control part - - void setOutputEnabled(boolean enabled, String name); - - boolean getOutputEnabled(String name); - - // ENGINE OPERATION control part - - void start(); - - void setPaused(boolean streaming); - - boolean isPaused(); - - void stop(); - - void close(); - - void setOnStatusChanged(EventCallback, EngineStatus> callback); - - void setOnDeviceStatusChanged(EventCallback callback); - - void setOnOutputStatusChanged(EventCallback callback); - - EngineStatus getStatus(); -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/OutputStatus.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/OutputStatus.java deleted file mode 100644 index 332ef8b7..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/OutputStatus.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public enum OutputStatus { - NOT_INITIALIZED, INITIALIZING, INITIALIZED, FINALIZING, FINALIZED -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorDataEntry.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorDataEntry.java deleted file mode 100644 index e608d571..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorDataEntry.java +++ /dev/null @@ -1,13 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public class SensorDataEntry { - public SensorDataEntry(ISensor sensor, TimeT timestamp, ValueT value) { - this.sensor = sensor; - this.timestamp = timestamp; - this.value = value; - } - - public ISensor sensor; - public TimeT timestamp; - public ValueT value; -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorEventEntry.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorEventEntry.java deleted file mode 100644 index d7b1113b..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorEventEntry.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public class SensorEventEntry { - public SensorEventEntry(ISensor sensor, TimeT timestamp, int code, String message){ - this.sensor = sensor; - this.timestamp = timestamp; - this.code = code; - this.message = message; - } - - public ISensor sensor; - public TimeT timestamp; - public int code; - public String message; -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorStatus.java b/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorStatus.java deleted file mode 100644 index 1d5b3189..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/base/SensorStatus.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.fbk.mpba.sensorsflows.base; - -public enum SensorStatus { - OFF, ON, ERROR -} diff --git a/src/main/java/eu/fbk/mpba/sensorsflows/util/ReadOnlyIterable.java b/src/main/java/eu/fbk/mpba/sensorsflows/util/ReadOnlyIterable.java deleted file mode 100644 index 51825479..00000000 --- a/src/main/java/eu/fbk/mpba/sensorsflows/util/ReadOnlyIterable.java +++ /dev/null @@ -1,49 +0,0 @@ -package eu.fbk.mpba.sensorsflows.util; - -import java.util.Iterator; - -/** - * Allows to convert an iterator to an enumeration - */ -public class ReadOnlyIterable implements Iterable{ - private Iterator i; - - public ReadOnlyIterable(Iterator i) { - this.i = i; - } - - /** - * Returns an {@link java.util.Iterator} for the elements in this object. - * - * @return An {@code Iterator} instance. - */ - @Override - public Iterator iterator() { - return new Iterator() { - - @Override - public boolean hasNext() { - return i.hasNext(); - } - - @Override - public E next() { - return i.next(); - } - - /** - * Removes the last object returned by {@code next} from the collection. - * This method can only be called once between each call to {@code next}. - * - * @throws UnsupportedOperationException if removing is not supported by the collection being - * iterated. - * @throws IllegalStateException if {@code next} has not been called, or {@code remove} has - * already been called after the last call to {@code next}. - */ - @Override - public void remove() { - throw new UnsupportedOperationException("Read Only collection."); - } - }; - } -} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/InputUnitTest.java b/src/test/java/eu/fbk/mpba/sensorflow/InputUnitTest.java new file mode 100644 index 00000000..ee9d241e --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/InputUnitTest.java @@ -0,0 +1,122 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class InputUnitTest { + + private final String SESSION_ID = "TestSession"; + + @Test + public void test_muting() throws InterruptedException { + SensorFlow sf = new SensorFlow(SESSION_ID); + + MockInput i = new MockInput(null, null); + MockOutput o = new MockOutput("Output"); + + sf.add(o); + sf.add(i); + + assertFalse(i.isMuted()); + long before = o.receivedLines.get(); + Thread.sleep(100); + + i.mute(); + assertTrue(i.isMuted()); + Thread.sleep(50); + long after = o.receivedLines.get(); + + Thread.sleep(75); + assertTrue(i.isMuted()); + assertEquals(after, o.receivedLines.get()); + assertNotEquals(before, after); + assertTrue(before < after); + + i.unmute(); + Thread.sleep(75); + assertTrue(after < o.receivedLines.get()); + assertTrue(before < o.receivedLines.get()); + assertFalse(i.isMuted()); + + sf.close(); + } + + @Test + public void test_timeSource() throws InterruptedException { + long monoUTCNanos = Input.getTimeSource().getMonoUTCNanos(); + Thread.sleep(0, 10); + long monoUTCNanos1 = Input.getTimeSource().getMonoUTCNanos(); + assertTrue(monoUTCNanos <= monoUTCNanos1 - 10); + + long bootNanos = + System.currentTimeMillis() * 1_000_000L + - System.nanoTime(); + + // 100ms + assertTrue(Math.abs(bootNanos - Input.getTimeSource().getMonoUTCNanos(0)) < 100_000_000L); + + // 100ms + assertTrue(Math.abs( + System.currentTimeMillis() * 1_000_000L + - Input.getTimeSource().getMonoUTCNanos(System.nanoTime())) + < 100_000_000L); + } + + @Test + public void test_getHeader_constructors() { + String[] h = new String[]{"x", "y", "z", "x", "y", "z", "x", "y", "z", "x", "y", "z"}; + Input i1 = new Input(Arrays.asList(h)) { + @Override + public void onCreate() { + + } + + @Override + public void onAdded() { + + } + + @Override + public void onRemoved() { + + } + + @Override + public void onClose() { + + } + }; + Input i2 = new Input(null, Arrays.asList(h)) { + @Override + public void onCreate() { + + } + + @Override + public void onAdded() { + + } + + @Override + public void onRemoved() { + + } + + @Override + public void onClose() { + + } + }; + assertArrayEquals(h, i1.getHeader().toArray(new String[h.length])); + assertArrayEquals(h, i2.getHeader().toArray(new String[h.length])); + } +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/ManagersUnitTest.java b/src/test/java/eu/fbk/mpba/sensorflow/ManagersUnitTest.java new file mode 100644 index 00000000..e058494c --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/ManagersUnitTest.java @@ -0,0 +1,72 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Test; + +public class ManagersUnitTest { + @Test(expected = UnsupportedOperationException.class) + public void test_OutOfPlaceEvents_output_onStopAndClose() { + OutputManager o = new OutputManager(new MockOutput("Out"), + new OutputObserver() { + @Override + public void outputStatusChanged(OutputManager sender, PluginStatus status) { + + } + }, true); + + o.onStopAndClose(); + } + + @Test(expected = UnsupportedOperationException.class) + public void test_OutOfPlaceEvents_output_onCreateAndStart() { + OutputManager o = new OutputManager(new MockOutput("Out"), + new OutputObserver() { + @Override + public void outputStatusChanged(OutputManager sender, PluginStatus status) { + + } + }, true); + + o.onCreateAndAdded("HiPedro"); + o.onCreateAndAdded("HiPedro"); + } + + @Test(expected = UnsupportedOperationException.class) + public void test_OutOfPlaceEvents_input_onCreate() { + InputManager o = new InputManager(new MockInput(null, "In"), + new InputObserver() { + @Override + public void inputStatusChanged(InputManager input, PluginStatus state) { + + } + }); + + o.onCreate(); + o.onCreate(); + } + + @Test(expected = UnsupportedOperationException.class) + public void test_OutOfPlaceEvents_input_onAdded() { + InputManager o = new InputManager(new MockInput(null, "In"), + new InputObserver() { + @Override + public void inputStatusChanged(InputManager input, PluginStatus state) { + + } + }); + + o.onAdded(); + } + + @Test(expected = UnsupportedOperationException.class) + public void test_OutOfPlaceEvents_input_onRemovedAndClose() { + InputManager o = new InputManager(new MockInput(null, "In"), + new InputObserver() { + @Override + public void inputStatusChanged(InputManager input, PluginStatus state) { + + } + }); + + o.onRemovedAndClose(); + } +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/MockInput.java b/src/test/java/eu/fbk/mpba/sensorflow/MockInput.java new file mode 100644 index 00000000..f15b61ef --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/MockInput.java @@ -0,0 +1,79 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Assert; + +import java.util.Random; + +import static eu.fbk.mpba.sensorflow.sense.Stream.HEADER_VALUE; + + +public class MockInput extends Input { + + private PluginStatus testStatus = PluginStatus.INSTANTIATED; + private Thread producer; + + MockInput(InputGroup parent, String name) { + super(parent, name, HEADER_VALUE); + } + + @Override + public void onCreate() { +// System.out.println("MockInput onCreateAndAdded"); + Assert.assertTrue(testStatus == PluginStatus.INSTANTIATED || testStatus == PluginStatus.CLOSED); + producer = new Thread(new Runnable() { + @Override + public void run() { + try { + int i = 0; + Random random = new Random(0); + while (testStatus != PluginStatus.CLOSED) { + while (testStatus == PluginStatus.ADDED) { + MockInput.this.pushValue(getTimeSource().getMonoUTCNanos(), new double[]{i++}); + sentLines++; + if (random.nextInt(15) == 0) { + MockInput.this.pushLog(getTimeSource().getMonoUTCNanos(), "Random gave 0/15"); + sentLines++; + } + Thread.sleep(8); + } + Thread.sleep(200); + } + } catch (InterruptedException e) { + // exit + } + } + }); + producer.start(); + testStatus = PluginStatus.CREATED; + } + + @Override + public void onAdded() { + Assert.assertTrue(testStatus == PluginStatus.CREATED || testStatus == PluginStatus.REMOVED); + testStatus = PluginStatus.ADDED; + } + + @Override + public void onRemoved() { + Assert.assertEquals(PluginStatus.ADDED, testStatus); + testStatus = PluginStatus.REMOVED; + } + + @Override + public void onClose() { + Assert.assertEquals(PluginStatus.REMOVED, testStatus); + testStatus = PluginStatus.CLOSED; + try { + producer.interrupt(); + producer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public long getThreadId() { + return producer.getId(); + } + + long sentLines = 0; +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/MockOutput.java b/src/test/java/eu/fbk/mpba/sensorflow/MockOutput.java new file mode 100644 index 00000000..e2eb44aa --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/MockOutput.java @@ -0,0 +1,70 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Assert; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicLong; + +public class MockOutput implements Output { + private final String name; + private String sessionId; + private HashSet testLinkedInputs; + + MockOutput(String name) { + this.name = name; + this.testLinkedInputs = new HashSet<>(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void onCreate(String sessionId) { + this.sessionId = sessionId; + } + + @Override + public void onClose() { + for (Input i : testLinkedInputs) + Log.l(i.getName()); + + Assert.assertTrue(testLinkedInputs.isEmpty()); + } + + final HashMap lastValueTimes = new HashMap<>(); + final HashMap lastLogTimes = new HashMap<>(); + + @Override + public void onInputAdded(Input input) { + Assert.assertFalse(input.getName() + " already present in n elements: " + testLinkedInputs.size(), + testLinkedInputs.contains(input)); + testLinkedInputs.add(input); + lastValueTimes.put(input, 0L); + lastLogTimes.put(input, 0L); + } + + @Override + public void onInputRemoved(Input input) { + Assert.assertTrue(testLinkedInputs.contains(input)); + testLinkedInputs.remove(input); + } + + @Override + public void onValue(Input input, long timestamp, double[] value) { + Assert.assertTrue(lastValueTimes.get(input) < timestamp); + lastValueTimes.put(input, timestamp); + receivedLines.incrementAndGet(); + } + + @Override + public void onLog(Input input, long timestamp, String text) { + Assert.assertTrue(lastLogTimes.get(input) < timestamp); + lastValueTimes.put(input, timestamp); + receivedLines.incrementAndGet(); + } + + AtomicLong receivedLines = new AtomicLong(0); +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/MockWirelessDevice.java b/src/test/java/eu/fbk/mpba/sensorflow/MockWirelessDevice.java new file mode 100644 index 00000000..3dc922d2 --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/MockWirelessDevice.java @@ -0,0 +1,89 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Assert; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import eu.fbk.mpba.sensorflow.sense.Stream; +import eu.fbk.mpba.sensorflow.sense.WirelessDevice; + +public class MockWirelessDevice extends WirelessDevice { + private PluginStatus testStatus = PluginStatus.INSTANTIATED; + + MockWirelessDevice(String name, String configuration) { + setName(name); + setConfiguration(configuration); + } + + @Override + public void onCreate() { + Assert.assertEquals(PluginStatus.INSTANTIATED, testStatus); + for (int i = 0; i < 10; i++) { + final Stream s = new Stream(this, Stream.HEADER_VALUE, "MockStream" + i); + new Thread(new Runnable() { + @Override + public void run() { + try { + int i1 = 0; + Random random = new Random(0); + while (testStatus != PluginStatus.CLOSED) { + while (testStatus == PluginStatus.ADDED) { + s.pushValue(Input.getTimeSource().getMonoUTCNanos(), new double[]{i1++}); + sentLines.incrementAndGet(); + int rnd = random.nextInt(100); + if (rnd < 20) { + if (rnd < 8) { + s.pushLog(Input.getTimeSource().getMonoUTCNanos(), "Random gave 0-7/15"); + } else switch (rnd) { + case 9: + s.pushQuality("Buona qualità, yess"); + break; + case 10: + s.pushQuality("Cattiva qualità, néh"); + break; + default: + s.pushQuality(Input.getTimeSource().getMonoUTCNanos(), ((rnd - 10) * 60 / 10 + 40) + "%"); + break; + } + sentLines.incrementAndGet(); + } + Thread.sleep(8); + } + Thread.sleep(100); + } + } catch (InterruptedException e) { + // exit + } + } + }).start(); + addStream(s); + } + testStatus = PluginStatus.CREATED; + } + + @Override + public void onAdded() { + Assert.assertTrue(testStatus == PluginStatus.CREATED || testStatus == PluginStatus.REMOVED); + testStatus = PluginStatus.ADDED; + } + + @Override + public void onRemoved() { + Assert.assertEquals(PluginStatus.ADDED, testStatus); + testStatus = PluginStatus.REMOVED; + } + + @Override + public void onClose() { + Assert.assertEquals(PluginStatus.REMOVED, testStatus); + testStatus = PluginStatus.CLOSED; + } + + @Override + public void connect(Object done) { + + } + + AtomicLong sentLines = new AtomicLong(0); +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/OutputBufferTest.java b/src/test/java/eu/fbk/mpba/sensorflow/OutputBufferTest.java new file mode 100644 index 00000000..b39113c6 --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/OutputBufferTest.java @@ -0,0 +1,216 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class OutputBufferTest { + + @Test(expected = NullPointerException.class) + public void test_nullDrain() { + new OutputBuffer(null, 0, false); + } + + @Test(expected = IllegalArgumentException.class) + public void test_sizeZero() { + new OutputBuffer(new MockOutput(""), 0, false); + } + + @Test + public void test_getName_events() { + final boolean[] cond = new boolean[2]; + final String name = "CustomMockOutput20957"; + + MockOutput o = new MockOutput(name) { + @Override + public void onCreate(String sessionId) { + super.onCreate(sessionId); + cond[0] = true; + } + + @Override + public void onClose() { + super.onClose(); + cond[1] = true; + } + }; + OutputBuffer outputBuffer = new OutputBuffer(o, 10, false); + + outputBuffer.onCreate("TestSession523"); + outputBuffer.onClose(); + + assertEquals(name, outputBuffer.getName()); + + assertTrue(cond[0]); + assertTrue(cond[1]); + } + + @Test + public void test_sequential() throws InterruptedException { + final String sequence = "aaavlvalvlvllvvlvlvrvllrvvrrvvvvvvarvllvlvvvvrrvr"; + final boolean[] done = new boolean[]{false}; + + final OutputBuffer q = new OutputBuffer(new Output() { + @Override + public void onCreate(String sessionId) { } + @Override + public String getName() { return null; } + + @Override + public void onClose() { + assertTrue(counter == sequence.length()); + done[0] = true; + } + + int counter = 0; + + @Override + public void onInputAdded(Input input) { + assertTrue(sequence.charAt(counter++) == 'a'); + } + + @Override + public void onInputRemoved(Input input) { + assertTrue(sequence.charAt(counter++) == 'r'); + } + + @Override + public void onValue(Input input, long timestamp, double[] value) { + assertTrue(sequence.charAt(counter++) == 'v'); + } + + @Override + public void onLog(Input input, long timestamp, String text) { + assertTrue(sequence.charAt(counter++) == 'l'); + } + }, sequence.length() + 1, false); + + Input a = new MockInput(null, null); + + for (int i = 0; i < sequence.length(); i++) { + assertTrue(q.size() == i); + assertTrue(q.remainingCapacity() == sequence.length() + 1 - i); + switch (sequence.charAt(i)) { + case 'a': + q.onInputAdded(a); + break; + case 'r': + q.onInputRemoved(a); + break; + case 'v': + q.onValue(a, Input.getTimeSource().getMonoUTCNanos(), new double[]{1}); + break; + case 'l': + q.onLog(a, Input.getTimeSource().getMonoUTCNanos(), "hi"); + break; + } + assertTrue(q.size() == i + 1); + assertTrue(q.remainingCapacity() == sequence.length() - i); + } + + assertTrue(q.remainingCapacity() == 1); + + for (int i = 0; i < sequence.length(); i++) { + assertTrue(q.size() == sequence.length() - i); + assertTrue(q.remainingCapacity() == i + 1); + q.pollToHandler(1, TimeUnit.SECONDS); + } + + assertTrue(q.size() == 0); + + q.getHandler().onClose(); + assertTrue(done[0]); + } + + @Test + public void test_full_complex() throws InterruptedException { + Output o = new Output() { + @Override + public void onCreate(String sessionId) { } + @Override + public String getName() { return null; } + @Override + public void onClose() { } + + Input last = null; + + @Override + public void onInputAdded(Input input) { + assertTrue("Input not removed before adding new one.", last == null); + last = input; + } + + @Override + public void onInputRemoved(Input input) { + assertTrue("Input not added before removing it.", last != null); + last = null; + } + + long lastTime = 0; + + @Override + public void onValue(Input input, long timestamp, double[] value) { + assertTrue("Value without an input.", last != null); + assertTrue("Different input sending values.", last == input); + assertTrue("Wrong name composed with index.", input.getName().equals("MockInput" + (int) value[0])); + assertTrue("Time not monotonic.", timestamp >= lastTime); + assertTrue("Time not strictly monotonic.", timestamp > lastTime); + assertTrue(value[1] == 1 || value[1] == 2); + } + + int lastLogI = -1; + + @Override + public void onLog(Input input, long timestamp, String text) { + assertTrue("Log without an input.", last != null); + assertTrue("Different input sending logs.", last == input); + assertTrue("Time not monotonic.", timestamp >= lastTime); + assertTrue("Time not strictly monotonic.", timestamp > lastTime); + assertTrue("Wrong text", text.startsWith("HiPedro ")); + int i = Integer.parseInt(text.substring("HiPedro ".length()), 10); + assertTrue("Wrong index", i > lastLogI); + lastLogI = i; + } + }; + + ArrayList mi = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MockInput in = new MockInput(null, "MockInput" + i); + mi.add(i, in); + } + + final OutputBuffer q = new OutputBuffer(o, 1000, false); + new Thread(new Runnable() { + @Override + public void run() { + try { + //noinspection InfiniteLoopStatement + while (true) + q.pollToHandler(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + + for (int i = 0; i < mi.size() * 1000; i++) { + if (i % 1000 == 0) { + q.onInputAdded(mi.get(i / 1000)); + } else if (i % 1000 == 999) { + q.onInputRemoved(mi.get(i / 1000)); + } else { + if (i % 3 == 0) { + q.onLog(mi.get(i / 1000), Input.getTimeSource().getMonoUTCNanos(), "HiPedro " + i); + } else { + q.onValue(mi.get(i / 1000), Input.getTimeSource().getMonoUTCNanos(), new double[]{i / 1000, i % 3, i}); + } + } + } + + q.clear(); + } +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowTest.java b/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowTest.java new file mode 100644 index 00000000..427d840d --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowTest.java @@ -0,0 +1,121 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; + +public class SensorFlowTest { + + @Test + public void test_Log() { + Log.l(); + Log.s(); + } + + @Test + public void test_sf_oneOutputThreaded() throws InterruptedException { + Log.enabled = true; + Log.l(Thread.currentThread().getName()); + + SensorFlow sf = new SensorFlow(); + MockOutput mo = new MockOutput("consumer"); + ArrayList mi = new ArrayList<>(); + for (int i = 0; i < 10; i++) + mi.add(i, new MockInput(null, "MockInput" + i)); + + sf.add(mo); + sf.add(mi); + + Thread.sleep(1500); + + sf.close(); + + long sent = 0; + for (InputGroup i : mi) + sent += ((MockInput)i).sentLines; + + Log.l("Sent: " + sent); + Log.l("Received: " + mo.receivedLines); + Assert.assertTrue(sent == mo.receivedLines.get()); + } + + @Test + public void test_sf_oneOutputNonThreaded() throws InterruptedException { + Log.enabled = true; + Log.l(Thread.currentThread().getName()); + + SensorFlow sf = new SensorFlow(); + MockOutput mo = new MockOutput("consumer"); + ArrayList mi = new ArrayList<>(); + for (int i = 0; i < 10; i++) + mi.add(i, new MockInput(null, "MockInput" + i)); + + sf.addInThread(mo); + sf.add(mi); + + Thread.sleep(1500); + + for (InputGroup inputGroup : mi) { + inputGroup.onRemoved(); + } + + long sent = 0; + for (InputGroup i : mi) + sent += ((MockInput)i).sentLines; + + Log.l("Sent: " + sent); + Log.l("Received: " + mo.receivedLines); + Assert.assertTrue(sent == mo.receivedLines.get()); + } + + @Test + public void test_sf_routing() throws InterruptedException { + final int NUM = 10; + + ArrayList mi = new ArrayList<>(); + for (int i = 0; i < NUM; i++) + mi.add(i, new MockInput(null, "MockInput" + i)); + ArrayList mo = new ArrayList<>(); + for (int i = 0; i < NUM; i++) + mo.add(i, new MockOutput("MockOutput" + i)); + + SensorFlow sf = new SensorFlow(); + for (Output p : mo) { + sf.addNotRouted(p); + } + + sf.addNotRouted(mi); + + for (Output o : mo) { + Assert.assertTrue(((MockOutput)o).receivedLines.get() == 0); + } + + sf.routeClear(); + for (Output o : mo) { + Assert.assertTrue(((MockOutput)o).receivedLines.get() == 0); + } + + sf.routeAll(); + Thread.sleep(100); + for (Output o : mo) { + Assert.assertTrue(((MockOutput)o).receivedLines.get() > 0); + } + + sf.routeClear(); + Thread.sleep(100); + final long[] sum = new long[]{0}; + for (Output o : mo) { + sum[0] += ((MockOutput)o).receivedLines.get(); + } + + Assert.assertTrue("One for each assumption failed", sum[0] > NUM); + + Thread.sleep(100); + for (Output o : mo) { + sum[0] -= ((MockOutput)o).receivedLines.get(); + } + + Assert.assertTrue("Things after routeClear", sum[0] == 0); + } +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowUnitTest.java b/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowUnitTest.java new file mode 100644 index 00000000..355a4ff4 --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/SensorFlowUnitTest.java @@ -0,0 +1,408 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class SensorFlowUnitTest { + + private final String SESSION_ID = "TestSession"; + private List inputs = new ArrayList<>(); + private List outputs = new ArrayList<>(); + + @Before + public void setup() { + for (int i = 0; i < 10; i++) + inputs.add(i, new MockInput(null, "MockInput" + i)); + + for (int i = 0; i < 10; i++) + outputs.add(i, new MockOutput("MockOutput" + i)); + } + + @Test + public void test_getSessionTag_finalize() throws Throwable { + SensorFlow sf = new SensorFlow(SESSION_ID); + + assertEquals(SESSION_ID, sf.getSessionTag()); + + //noinspection FinalizeCalledExplicitly + sf.finalize(); + } + + @Test + public void test_add_output() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + sf.add(outputs.get(3)); + assertEquals(1, sf.getOutputs().size()); + sf.addNotRouted(outputs.get(5)); + assertEquals(2, sf.getOutputs().size()); + sf.add(outputs.get(3)); + assertEquals(2, sf.getOutputs().size()); + sf.addNotRouted(outputs.get(7)); + assertEquals(3, sf.getOutputs().size()); + + for (Output p : outputs) { + sf.add(p); + } + + assertEquals(outputs.size(), sf.getOutputs().size()); + + sf.close(); + } + + @Test + public void test_add_input() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + sf.add(inputs.get(3)); + assertEquals(1, sf.getInputs().size()); + sf.addNotRouted(inputs.get(5)); + assertEquals(2, sf.getInputs().size()); + sf.add(inputs.get(3)); + assertEquals(2, sf.getInputs().size()); + sf.addNotRouted(inputs.get(7)); + assertEquals(3, sf.getInputs().size()); + + sf.add(inputs); + assertEquals(inputs.size(), sf.getInputs().size()); + + sf.close(); + } + + @Test + public void test_remove_output() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + for (Output p : outputs) { + sf.addNotRouted(p); + } + + + sf.remove(outputs.get(3)); + assertEquals(outputs.size() - 1, sf.getOutputs().size()); + sf.remove(outputs.get(5)); + assertEquals(outputs.size() - 2, sf.getOutputs().size()); + sf.remove(outputs.get(3)); + assertEquals(outputs.size() - 2, sf.getOutputs().size()); + sf.remove(outputs.get(7)); + assertEquals(outputs.size() - 3, sf.getOutputs().size()); + + sf.close(); + } + + @Test + public void test_remove_input() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + for (InputGroup p : inputs) { + sf.addNotRouted(p); + } + + + sf.remove(inputs.get(3)); + assertEquals(inputs.size() - 1, sf.getInputs().size()); + sf.remove(inputs.get(5)); + assertEquals(inputs.size() - 2, sf.getInputs().size()); + sf.remove(inputs.get(3)); + assertEquals(inputs.size() - 2, sf.getInputs().size()); + + sf.close(); + } + + @Test + public void test_routes() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + for (InputGroup p : inputs) { + sf.addNotRouted(p); + } + + for (Output p : outputs) { + sf.addNotRouted(p); + } + + + // Add all routes + sf.routeAll(); + + // Routes everywhere + for (InputGroup i : inputs) + for (Output o : outputs) + assertTrue(sf.isRouted((Input) i, o)); + + // Remove routes from x + for (int i = 0; i < inputs.size(); i++) + for (int j = 0; j < outputs.size(); j++) + if (i + j == inputs.size()) + sf.removeRoute((Input) inputs.get(i), outputs.get(j)); + + // No Routes in x places, Routes in x^ places + for (int i = 0; i < inputs.size(); i++) + for (int j = 0; j < outputs.size(); j++) + assertEquals("i:" + i + " j:" + j, + i + j != inputs.size(), + sf.isRouted((Input) inputs.get(i), outputs.get(j))); + + for (int i = 0; i < inputs.size(); i++) + for (int j = 0; j < outputs.size(); j++) + if (i + j == inputs.size()) + sf.addRoute((Input) inputs.get(i), outputs.get(j)); + + // Routes everywhere + for (InputGroup i : inputs) + for (Output o : outputs) + assertTrue(sf.isRouted((Input) i, o)); + + // Remove all routes + sf.routeClear(); + + // No Routes anywhere + for (InputGroup i : inputs) + for (Output o : outputs) + assertFalse(sf.isRouted((Input) i, o)); + + assertFalse(sf.isRouted(null, outputs.get(0))); + assertFalse(sf.isRouted((Input)inputs.get(0), null)); + assertFalse(sf.isRouted(null, null)); + + sf.close(); + } + + @Test + public void test_isOutputEnabled() { + SensorFlow sf = new SensorFlow(SESSION_ID); + + for (Output p : outputs) { + sf.addNotRouted(p); + } + + for (Output o : outputs) { + assertTrue(o.getName(), sf.isOutputEnabled(o)); + } + + + sf.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void test_disableOutput_enableOutput() throws InterruptedException { + SensorFlow sf = new SensorFlow(SESSION_ID); + + for (Output p : outputs) { + sf.addInThreadNotRouted(p); + } + + for (InputGroup p : inputs) { + sf.addNotRouted(p); + } + + sf.routeAll(); + + long dataReceivedToggle = 0; + long dataReceivedActive = 0; + + for (int i = 0; i < outputs.size(); i++) + if (i % 2 == 0) { + sf.disableOutput(outputs.get(i)); + dataReceivedToggle += ((MockOutput) outputs.get(i)).receivedLines.get(); + } else + dataReceivedActive += ((MockOutput) outputs.get(i)).receivedLines.get(); + + Thread.sleep(100); + + for (int i = 0; i < outputs.size(); i++) + if (i % 2 == 0) { + dataReceivedToggle -= ((MockOutput) outputs.get(i)).receivedLines.get(); + assertFalse(sf.isOutputEnabled(outputs.get(i))); + } else { + dataReceivedActive -= ((MockOutput) outputs.get(i)).receivedLines.get(); + assertTrue(sf.isOutputEnabled(outputs.get(i))); + } + + dataReceivedActive *= -1; + dataReceivedToggle *= -1; + + // No data received after disable + assertEquals(0, dataReceivedToggle); + + // Data received meanwhile, at least one per InputGroup + assertTrue("dataReceivedActive: " + dataReceivedActive, dataReceivedActive > inputs.size()); + + dataReceivedToggle = 0; + dataReceivedActive = 0; + + for (int i = 0; i < outputs.size(); i++) + if (i % 2 == 0) { + dataReceivedToggle += ((MockOutput) outputs.get(i)).receivedLines.get(); + sf.enableOutput(outputs.get(i)); + } else + dataReceivedActive += ((MockOutput) outputs.get(i)).receivedLines.get(); + + Thread.sleep(100); + + for (int i = 0; i < outputs.size(); i++) { + if (i % 2 == 0) { + dataReceivedToggle -= ((MockOutput) outputs.get(i)).receivedLines.get(); + } else + dataReceivedActive -= ((MockOutput) outputs.get(i)).receivedLines.get(); + assertTrue(sf.isOutputEnabled(outputs.get(i))); + } + + dataReceivedActive *= -1; + dataReceivedToggle *= -1; + + // Data received after enable, at least one per InputGroup + assertTrue("dataReceivedToggle: " + dataReceivedToggle, dataReceivedToggle > inputs.size()); + + // Data received meanwhile, at least one per InputGroup + assertTrue(dataReceivedActive > inputs.size()); + + try { + sf.remove(outputs.get(0)); + sf.disableOutput(outputs.get(0)); + } finally { + sf.close(); + } + } + + private void assertThread(long id) { + assertEquals(Thread.currentThread().getId(), id); + } + + private void assertThread(Input i) { + assertEquals(Thread.currentThread().getId(), ((MockInput)i).getThreadId()); + } + + @Test + public void test_InThread() throws InterruptedException { + SensorFlow sf = new SensorFlow(SESSION_ID); + for (InputGroup p : inputs) { + sf.add(p); + } + + + final long id = Thread.currentThread().getId(); + + final boolean[] cond = new boolean[4]; + + sf.addInThread(new MockOutput("CustomMockOutput") { + @Override + public void onValue(Input input, long timestamp, double[] value) { + assertThread(input); + cond[0] = true; + } + + @Override + public void onLog(Input input, long timestamp, String text) { + assertThread(input); + cond[1] = true; + } + + @Override + public void onInputAdded(Input input) { + assertThread(id); + cond[2] = true; + } + + @Override + public void onInputRemoved(Input input) { + assertThread(id); + cond[3] = true; + } + }); + + Thread.sleep(300); + + sf.close(); + + assertTrue(cond[2]); + assertTrue(cond[3]); + assertTrue(cond[0]); + assertTrue(cond[1]); + } + + @Test + public void test_InThreadNotRouted() { + SensorFlow sf = new SensorFlow(SESSION_ID); + for (InputGroup p : inputs) { + sf.add(p); + } + + + final long id = Thread.currentThread().getId(); + + sf.addInThreadNotRouted(new MockOutput("CustomMockOutput") { + @Override + public void onValue(Input input, long timestamp, double[] value) { + assertThread(input); + } + + @Override + public void onLog(Input input, long timestamp, String text) { + assertThread(input); + } + + @Override + public void onInputAdded(Input input) { + assertThread(id); + } + + @Override + public void onInputRemoved(Input input) { + assertThread(id); + } + }); + + sf.routeAll(); + + sf.close(); + } + + @Test + public void test_NonInThread() { + SensorFlow sf = new SensorFlow(SESSION_ID); + for (InputGroup p : inputs) { + sf.add(p); + } + + final AtomicLong id = new AtomicLong(-1); + + sf.add(new MockOutput("CustomMockOutput") { + + @Override + public void onValue(Input input, long timestamp, double[] value) { + id.compareAndSet(-1, Thread.currentThread().getId()); + assertThread(id.get()); + } + + @Override + public void onLog(Input input, long timestamp, String text) { + id.compareAndSet(-1, Thread.currentThread().getId()); + assertThread(id.get()); + } + + @Override + public void onInputAdded(Input input) { + id.compareAndSet(-1, Thread.currentThread().getId()); + assertThread(id.get()); + } + + @Override + public void onInputRemoved(Input input) { + id.compareAndSet(-1, Thread.currentThread().getId()); + assertThread(id.get()); + } + }); + + sf.close(); + } +} diff --git a/src/test/java/eu/fbk/mpba/sensorflow/WirelessDeviceTest.java b/src/test/java/eu/fbk/mpba/sensorflow/WirelessDeviceTest.java new file mode 100644 index 00000000..e9d89f1e --- /dev/null +++ b/src/test/java/eu/fbk/mpba/sensorflow/WirelessDeviceTest.java @@ -0,0 +1,28 @@ +package eu.fbk.mpba.sensorflow; + +import org.junit.Assert; +import org.junit.Test; + +public class WirelessDeviceTest { + @Test + public void test_one() throws InterruptedException { + Log.enabled = true; + + MockWirelessDevice w = new MockWirelessDevice("WD1", ""); + + SensorFlow sf = new SensorFlow(); + + MockOutput o = new MockOutput("Out"); + + sf.add(o); + sf.add(w); + + Thread.sleep(1000); + + sf.close(); + + Log.l("Sent: " + w.sentLines); + Log.l("Received: " + o.receivedLines); + Assert.assertTrue(w.sentLines.get() == o.receivedLines.get()); + } +}