Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ o2_add_library(Framework
src/SimpleOptionsRetriever.cxx
src/O2ControlHelpers.cxx
src/O2ControlLabels.cxx
src/CommonLabels.cxx
src/O2ControlParameters.cxx
src/O2DataModelHelpers.cxx
src/OutputSpec.cxx
Expand Down
26 changes: 26 additions & 0 deletions Framework/Core/include/Framework/CommonLabels.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_COMMONLABELS_H
#define O2_FRAMEWORK_COMMONLABELS_H

#include "Framework/DataProcessorLabel.h"

namespace o2::framework
{

// Label to disable forwarding/advertising of DomainInfoHeader (oldest possible outputs)
// When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream.
const extern DataProcessorLabel suppressDomainInfoLabel;

} // namespace o2::framework

#endif // O2_FRAMEWORK_COMMONLABELS_H
19 changes: 19 additions & 0 deletions Framework/Core/src/CommonLabels.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/CommonLabels.h"

namespace o2::framework
{

const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"};

} // namespace o2::framework
7 changes: 7 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "Framework/DefaultsHelpers.h"
#include "Framework/Signpost.h"
#include "Framework/DriverConfig.h"
#include "Framework/CommonLabels.h"

#include "TextDriverClient.h"
#include "WSDriverClient.h"
Expand Down Expand Up @@ -604,6 +605,12 @@ o2::framework::ServiceSpec
break;
}
}
for (const auto& label : services.get<DeviceSpec const>().labels) {
if (label == suppressDomainInfoLabel) {
decongestion->suppressDomainInfo = true;
break;
}
}
auto& queue = services.get<AsyncQueue>();
decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
Expand Down
7 changes: 7 additions & 0 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Framework/DeviceStateEnums.h"
#include "Headers/DataHeader.h"
#include "Framework/DataProcessingHeader.h"
#include "DecongestionService.h"

#include <fairmq/Device.h>
#include <fairmq/Channel.h>
Expand Down Expand Up @@ -83,6 +84,9 @@ void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFa

bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, ForwardChannelInfo const& info, ForwardChannelState& state, size_t timeslice)
{
if (ref.get<DecongestionService>().suppressDomainInfo) {
return false;
}
if (state.oldestForChannel.value >= timeslice) {
return false;
}
Expand All @@ -93,6 +97,9 @@ bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const

bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice)
{
if (ref.get<DecongestionService>().suppressDomainInfo) {
return false;
}
if (state.oldestForChannel.value >= timeslice) {
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DecongestionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace o2::framework
struct DecongestionService {
/// Wether we are a source in the processing chain
bool isFirstInTopology = true;
/// do not advertise/forward DomainInfoHeader from this device
bool suppressDomainInfo = false;
/// The last timeslice which the ExpirationHandler::Creator callback
/// created. This can be used to skip dummy iterations.
size_t nextEnumerationTimeslice = 0;
Expand Down