From 95138dca4dc75b557cfeb7b64334bbd1ac1cfbc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 10 Dec 2025 15:55:01 +0100 Subject: [PATCH] [core] part of OCTRL-1076, fixing leaked logging goroutines --- executor/executable/basictaskcommon.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/executor/executable/basictaskcommon.go b/executor/executable/basictaskcommon.go index 16627096..014d640b 100644 --- a/executor/executable/basictaskcommon.go +++ b/executor/executable/basictaskcommon.go @@ -28,12 +28,13 @@ import ( "bytes" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/utils" "io" "os/exec" "syscall" "time" + "github.com/AliceO2Group/Control/common/utils" + "github.com/AliceO2Group/Control/common/controlmode" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" @@ -73,6 +74,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { var errStdout, errStderr error var stdoutBuf, stderrBuf bytes.Buffer var stdout, stderr io.Writer + // To be closed after task is done + var stdoutLog, stderrLog *io.PipeWriter if t.Tci.Stdout == nil { none := "none" @@ -85,7 +88,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stdout { case "stdout": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -97,7 +100,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stdout = io.MultiWriter(stdoutLog, &stdoutBuf) case "all": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -112,7 +115,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stderr { case "stdout": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -124,7 +127,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderr = io.MultiWriter(stderrLog, &stderrBuf) case "all": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -142,7 +145,6 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderrIn, _ := t.taskCmd.StderrPipe() err = t.taskCmd.Start() - if err != nil { log.WithField("partition", t.knownEnvironmentId.String()). WithFields(logrus.Fields{ @@ -172,6 +174,13 @@ func (t *basicTaskBase) startBasicTask() (err error) { err = taskCmd.Wait() // ^ when this unblocks, the task is done + if stdoutLog != nil { + stdoutLog.Close() + } + if stderrLog != nil { + stderrLog.Close() + } + pendingState := mesos.TASK_FINISHED var tciCommandStr string if t.Tci.Value != nil {