Skip to content

Pipelining

Thorben Kuck edited this page Jul 9, 2017 · 3 revisions

Pipelining is the central system, inside the Communication Registration.

Upon calling:

communicationRegistration.register(TestObject.class);

the CommunicationRegistration checks, whether or not this Communication for the given Class is specified. If not, it will create an ReceivePipeline instance and map it to the provided class. With that step done, the Communication for the Object TestObject is now specified and can be called. It will (idependent of the prvious, possible creation) than return the instance of that pipeline and you can modify it in whatever fashion you like. You can add an OnReceive (to the head or tail), clear the pipeline or bind it to an handler. Let's go over the functions in detail.

Adding OnReceive to the ReceivePipeline

If you just got the Pipeline, you can add some OnReceive(Single/Triple/) to it. Let's imagine the following code:

CommunicationRegistration communicationRegistration = ...
OnReceive<TestObject> onReceive = ...

ReceivePipeline<TestObject> pipeline = communicationRegistration.register(TestObject.class);
// Add to the head of the pipeline
pipeline.addFirst(onReceive);

We register the communication and add our onReceive to it. Of course we can write it in a functional style like this:

CommunicationRegistration communicationRegistration = ...
OnReceive<TestObject> onReceive = ...

communicationRegistration.register(TestObject.class).addFirst(onReceive);

Other than the addFirst method, we can also utilize one of the following methods:

CommunicationRegistration communicationRegistration = ...
OnReceive<TestObject> onReceive = ...

ReceivePipeline<TestObject> pipeline = communicationRegistration.register(TestObject.class);
// Add to the tail of the pipeline
pipeline.addLast(onReceive);
// Add to the head of the pipeline, but only if it is not already registered
pipeline.addFirstIfNotContained(onReceive);
// Add to the tail of the pipeline, but only if it is not already registered
pipeline.addLastIfNotContained(onReceive);

In the last snipped, we would have registered onReceive only once, becaue the second and third statement would not be finished propperly.

Error while registering(add*IfNotConained())

If you try to register an OnReceive(Single/Triple/) using the .add*IfNotContained(onReceive) method and the communication already contains the onReceive instance anywhere inside the ReceivePipeline, the method onAddFailed will be called. This method is provided by the CanBeRegistered interface and is initialized with nothing by default. The OnReceive-interface-family implements CanBeRegistered

Open / Close / Seal

An existing ReceivePipeline can be oppend, closed and sealed. By default a ReceivePipeline is open and not sealed.

You change the state via: .open(), .close() and .seal()

An open ReceivePipeline accepts all additions wile a closed ReceivePipeline does not accept any new additions. Sealing permanently fixes the state of open/close and is not reversable. It makes the ReceivePipeline (sort of) immutable. Additions or clearings to the ReceivePipeline are not possible if first closed and than sealed, whilst an first opened and than sealed ReceivePipeline will always be open for addition, no matter what you try to change.

An mission-critical Communication, that should not be tinkered with later at rutime, would be defined like this:

// All handlers that are needed in a List
List<OnReceive<Critical>> list = ...
CommunicationRegistration communicationRegistration = ..
Class<Critical> criticalClass = Critical.class

// Some other stuff, maybe asynchronous

// synchronize so that this class only has access over the CommunicationRegistration
synchronized(communicationRegistration) {
    if(communicationRegistration.isRegistered(criticalClass)) {
        communicationRegistration.unregister(criticalClass);
    }

    ReceivePipeline<Critical> pipeline = communicationRegistration.register(criticalClass);
    list.forEach(pipeline::addLast);

    pipeline.close();
    pipeline.seal();
}

ReceivePipelineHandlerPolicy

The ReceivePipelineHandlerPolicy is an enum, that describes the behavior of the .to() method of the ReceivePipeline and can be set via .setReceivePipelineHandlerPolicy(). With that method, you can add real methods of existing Objects to handle (part of) a Communication (in an com.google.guava.EventBus style). Those handler-methods can be registered in the following way:

class Test {
    // the name is irrelevant
    // Also, you can get the Session and Connection injected, by declaring it in the method.
    @ReceiveHandler
    public void handle(TestObject testObject) {
        // do something with this ..
    }
}

public class Main {

    private Test test = new Test();

    public void register(CommunicationRegistration communicationRegistration) {
        ...
        // other stuff
        ...

        communicationRegistration.register(TestObject.class).to(test);
    }
}

The line communicationRegistration.register(TestObject.class).to(test); behaves differntly, depending on the set ReceivePipelineHandlerPolicy. It consist of 3 Types:

  • NOT_ALLOWED
  • ALLOW_MULTIPLE
  • ALLOW_SINGLE

The line will throw an PipelineAccessException if the ReceivePipeline is set to: NOT_ALLOWED.
ALLOW_MULTIPLE will accept any number of Methods, handling Objects as well as instances from the OnReceiveFamily.
ALLOW_SINGLE will clear the existing pipeline of any other handler (OnReceive or method), than add it to handle the Communication and lastly close and seal the pipeline, so that this method and this method only will handle the Communication for that ReceivePipeline.

Clone this wiki locally