diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index ff8d162f..72bbf737 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -29,6 +29,7 @@ import ( "encoding/json" "errors" "io" + "os/exec" "reflect" "strings" "syscall" @@ -71,48 +72,39 @@ type CommitResponse struct { } func (t *ControllableTask) Launch() error { - log.WithFields(logrus.Fields{ + defaultLogFields := logrus.Fields{ "taskId": t.ti.TaskID.GetValue(), "taskName": t.ti.Name, - "level": infologger.IL_Devel, "partition": t.knownEnvironmentId.String(), "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch begin") + } + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch begin") launchStartTime := time.Now() defer utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch", - log.WithFields(logrus.Fields{ - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - })) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) t.pendingFinalTaskStateCh = make(chan mesos.TaskState, 1) // we use this to receive a pending status update if the task was killed taskCmd, err := prepareTaskCmd(t.Tci) if err != nil { msg := "cannot build task command" - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithError(err). Error(msg) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, msg+": "+err.Error()) return err } - log.WithField("payload", string(t.ti.GetData()[:])). - WithField("task", t.ti.Name). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("payload", string(t.ti.GetData()[:])). + WithField(infologger.Level, infologger.IL_Devel). Debug("starting task asynchronously") // We fork out into a goroutine for the actual process management. @@ -120,523 +112,416 @@ func (t *ControllableTask) Launch() error { // Anything in the following goroutine must not touch *internalState, except // via channels. go func() { - truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch.async begin") - - // Set up pipes for controlled process - var errStdout, errStderr error - stdoutIn, _ := taskCmd.StdoutPipe() - stderrIn, _ := taskCmd.StderrPipe() - - err = taskCmd.Start() - if err != nil { - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). - Error("failed to run task") - - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - log.WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Debug("task launched") - - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - if t.Tci.Stdout == nil { - none := "none" - t.Tci.Stdout = &none + t.doLaunchTask(taskCmd, launchStartTime) + }() + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") + return nil +} + +func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time.Time) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) + + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") + + // Set up pipes for controlled process. They have to be retrieved before starting the task. + stdoutIn, _ := taskCmd.StdoutPipe() + stderrIn, _ := taskCmd.StderrPipe() + + err := taskCmd.Start() + if err != nil { + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + Error("failed to run task") + + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + // fixme: i confirmed on staging, that's a nil access! taskCmd.Process is not set if Start() fails + _ = t.doTermIntKill(-taskCmd.Process.Pid) + // fixme: shouldn't we also close pipes, as we do in some other error cases later? + return + } + log.WithFields(defaultLogFields).Debug("task launched") + + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + t.initTaskStdLogging(stdoutIn, stderrIn) + + log.WithFields(defaultLogFields). + WithFields(logrus.Fields{ + "controlPort": t.Tci.ControlPort, + "controlMode": t.Tci.ControlMode.String(), + "path": taskCmd.Path, + "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", + "argc": len(taskCmd.Args), + infologger.Level: infologger.IL_Devel, + }).Debug("starting gRPC client") + + controlTransport := executorcmd.ProtobufTransport + for _, v := range taskCmd.Args { + if strings.Contains(v, "-P OCClite") { + controlTransport = executorcmd.JsonTransport + break } - if t.Tci.Stderr == nil { - none := "none" - t.Tci.Stderr = &none + } + + rpcDialStartTime := time.Now() + t.rpc = executorcmd.NewClient( + t.Tci.ControlPort, + t.Tci.ControlMode, + controlTransport, + log.WithPrefix("executorcmd"). + WithFields(defaultLogFields), + ) + if t.rpc == nil { + err = errors.New("rpc client is nil") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Error("could not start gRPC client") + + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.doTermIntKill(-taskCmd.Process.Pid) + return + } + t.rpc.TaskCmd = taskCmd + + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + utils.TimeTrack(rpcDialStartTime, + "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + err = t.pollTaskForStandbyState() + if err != nil { + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + _ = t.rpc.Close() + t.rpc = nil + + pid := t.knownPid + if pid == 0 { + // The pid was never known through a successful `GetState` in the lifetime + // of this process, so we must rely on the PGID of the containing shell + pid = -taskCmd.Process.Pid } + log.WithFields(defaultLogFields). + Debug("sending SIGKILL (9) to task") + _ = syscall.Kill(pid, syscall.SIGKILL) // fixme: not sure why we do it differently than elsewhere (doTermIntKill) + _ = stdoutIn.Close() + _ = stderrIn.Close() + + log.WithFields(defaultLogFields). + Debug("task killed") + return + } + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + // Set up event stream from task + esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Error("cannot set up event stream from task") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.rpc.Close() + t.rpc = nil + // fixme: why don't we kill the task in this error case, but we do in others? + return + } + + // send RUNNING + t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") + taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) + taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) + + jsonEvent, err := json.Marshal(taskMessage) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Warning("error marshaling message") + } else { + t.sendMessage(jsonEvent) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + } + + // Process events from task in yet another goroutine + go func() { + t.processEventsFromTask(esc) + }() + + err = taskCmd.Wait() + // ^ when this unblocks, the task is done + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") + + pendingState := mesos.TASK_FINISHED + if err != nil { + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). + Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("task terminated with error (details):") + pendingState = mesos.TASK_FAILED + } + + select { + case pending := <-t.pendingFinalTaskStateCh: + pendingState = pending + default: + } + + if t.rpc != nil { + _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + log.WithFields(defaultLogFields). + Debug("rpc client closed") + t.rpc = nil + log.WithFields(defaultLogFields). + Debug("rpc client removed") + } + + t.sendStatus(t.knownEnvironmentId, pendingState, "") + return +} + +func (t *ControllableTask) initTaskStdLogging(stdoutIn io.ReadCloser, stderrIn io.ReadCloser) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + if t.Tci.Stdout == nil { + none := "none" + t.Tci.Stdout = &none + } + if t.Tci.Stderr == nil { + none := "none" + t.Tci.Stderr = &none + } + + go func() { + var errStdout error switch *t.Tci.Stdout { case "stdout": - go func() { - entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() case "all": - go func() { - entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() default: - go func() { - _, errStdout = io.Copy(io.Discard, stdoutIn) - }() + _, errStdout = io.Copy(io.Discard, stdoutIn) } + if errStdout != nil { + log.WithFields(defaultLogFields). + WithError(errStdout). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stdout of task") + } + }() + go func() { + var errStderr error switch *t.Tci.Stderr { case "stdout": - go func() { - entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, + } + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() case "all": - go func() { - entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, + } + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() default: - go func() { - _, errStderr = io.Copy(io.Discard, stderrIn) - }() + _, errStderr = io.Copy(io.Discard, stderrIn) } - - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "controlPort": t.Tci.ControlPort, - "controlMode": t.Tci.ControlMode.String(), - "task": t.ti.Name, - "id": t.ti.TaskID.Value, - "path": taskCmd.Path, - "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", - "argc": len(taskCmd.Args), - "level": infologger.IL_Devel, - }). - Debug("starting gRPC client") - - controlTransport := executorcmd.ProtobufTransport - for _, v := range taskCmd.Args { - if strings.Contains(v, "-P OCClite") { - controlTransport = executorcmd.JsonTransport - break - } + if errStderr != nil { + log.WithFields(defaultLogFields). + WithError(errStderr). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stderr of task") } + }() +} - rpcDialStartTime := time.Now() +func (t *ControllableTask) pollTaskForStandbyState() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + statePollingStartTime := time.Now() + elapsed := 0 * time.Second + for { + log.WithFields(defaultLogFields). + WithField("elapsed", elapsed.String()). + WithField(infologger.Level, infologger.IL_Devel). + Debug("polling task for STANDBY state reached") + + response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) + if err != nil { + log.WithError(err). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + Info("cannot query task status") + } else { + log.WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task status queried") + t.knownPid = int(response.GetPid()) + } + // NOTE: we acquire the transitioner-dependent STANDBY equivalent state + // fixme: that's a possible nil access there, because we do not "continue" on error + reachedState := t.rpc.FromDeviceState(response.GetState()) + + if reachedState == "STANDBY" && err == nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task running and ready for control input") + break + } else if reachedState == "DONE" || reachedState == "ERROR" { + // something went wrong, the device moved to DONE or ERROR on startup + return errors.New("task reached wrong state on startup") + } else if elapsed >= startupTimeout { + return errors.New("timeout while trying to poll task") + } else { + log.WithFields(defaultLogFields). + Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) + time.Sleep(startupPollingInterval) + elapsed += startupPollingInterval + } + } - t.rpc = executorcmd.NewClient( - t.Tci.ControlPort, - t.Tci.ControlMode, - controlTransport, - log.WithPrefix("executorcmd"). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - }, - ), - ) - if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - }). - WithField("level", infologger.IL_Devel). - Error("could not start gRPC client") + utils.TimeTrack(statePollingStartTime, + "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + return nil +} - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - t.rpc.TaskCmd = taskCmd +func (t *ControllableTask) processEventsFromTask(esc pb.Occ_EventStreamClient) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + deo := event.DeviceEventOrigin{ + AgentId: t.ti.AgentID, + ExecutorId: t.ti.GetExecutor().ExecutorID, + TaskId: t.ti.TaskID, + } - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - utils.TimeTrack(rpcDialStartTime, - "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - statePollingStartTime := time.Now() - elapsed := 0 * time.Second - for { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "elapsed": elapsed.String(), - "level": infologger.IL_Devel, - }). - Debug("polling task for IDLE state reached") - - response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - }). - Info("cannot query task status") - } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). - Debug("task status queried") - t.knownPid = int(response.GetPid()) - } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - reachedState := t.rpc.FromDeviceState(response.GetState()) - - if reachedState == "STANDBY" && err == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("command", truncatedCmd). - WithField("level", infologger.IL_Devel). - Debug("task running and ready for control input") - break - } else if reachedState == "DONE" || reachedState == "ERROR" { - // something went wrong, the device moved to DONE or ERROR on startup - pid := t.knownPid - if pid == 0 { - // The pid was never known through a successful `GetState` in the lifetime - // of this process, so we must rely on the PGID of the containing shell - pid = -t.rpc.TaskCmd.Process.Pid - } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.Name). - Debug("sending SIGKILL (9) to task") - _ = syscall.Kill(pid, syscall.SIGKILL) - _ = stdoutIn.Close() - _ = stderrIn.Close() - - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Debug("task killed") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") - return - } else if elapsed >= startupTimeout { - err = errors.New("timeout while waiting for task startup") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Error(err.Error()) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - - _ = stdoutIn.Close() - _ = stderrIn.Close() - - return - } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("command", truncatedCmd). - Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) - time.Sleep(startupPollingInterval) - elapsed += startupPollingInterval - } + for { + if t.rpc == nil { + log.WithFields(defaultLogFields). + Debug("event stream done") + break } - - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - utils.TimeTrack(statePollingStartTime, - "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - // Set up event stream from task - esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithField("task", t.ti.Name). + esr, err := esc.Recv() + if err == io.EOF { + log.WithFields(defaultLogFields). WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Error("cannot set up event stream from task") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - return + Debug("event stream EOF") + break } - - // send RUNNING - t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") - taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) - taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - jsonEvent, err := json.Marshal(taskMessage) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). + WithField("errorType", reflect.TypeOf(err)). + WithField(infologger.Level, infologger.IL_Devel). WithError(err). - Warning("error marshaling message") - } else { - t.sendMessage(jsonEvent) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - }).Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") - } - - // Process events from task in yet another goroutine - go func() { - deo := event.DeviceEventOrigin{ - AgentId: t.ti.AgentID, - ExecutorId: t.ti.GetExecutor().ExecutorID, - TaskId: t.ti.TaskID, - } - for { - if t.rpc == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - WithError(err). - Debug("event stream done") - break - } - esr, err := esc.Recv() - if err == io.EOF { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - WithError(err). - Debug("event stream EOF") - break - } - if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("errorType", reflect.TypeOf(err)). - WithField("level", infologger.IL_Devel). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - Warning("error receiving event") - if status.Code(err) == codes.Unavailable { - break - } - continue - } - ev := esr.GetEvent() - - deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) - if deviceEvent == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") - break - } else { - taskId := deo.TaskId.Value - - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). - WithField("taskPid", t.knownPid). - Debug("END_OF_STREAM DeviceEvent received - notifying environment") - } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). - WithField("taskPid", t.knownPid). - WithField("level", infologger.IL_Support). - Warningf("task transitioned to ERROR on its own - notifying environment") - } - } - deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + Warning("error receiving event") + if status.Code(err) == codes.Unavailable { + break } - }() - - err = taskCmd.Wait() - // ^ when this unblocks, the task is done - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }).Debug("task done (taskCmd.Wait unblocks), preparing final update") - - pendingState := mesos.TASK_FINISHED - if err != nil { - taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("level", infologger.IL_Ops). - Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "error": err.Error(), - "level": infologger.IL_Devel, - }). - Error("task terminated with error (details):") - pendingState = mesos.TASK_FAILED - } - - select { - case pending := <-t.pendingFinalTaskStateCh: - pendingState = pending - default: + // fixme: we also get codes.Canceled sometimes, it's probably OK and we should not complain + continue } + ev := esr.GetEvent() - if t.rpc != nil { - _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). - Debug("rpc client closed") - t.rpc = nil - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). - Debug("rpc client removed") - } - - if errStdout != nil || errStderr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "errStderr": errStderr, - "errStdout": errStdout, - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). - Warning("failed to capture stdout or stderr of task") + deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) + if deviceEvent == nil { + log.WithFields(defaultLogFields). + Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") + break + } else { + if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + Debug("END_OF_STREAM DeviceEvent received - notifying environment") + } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + WithField(infologger.Level, infologger.IL_Support). + Warningf("task transitioned to ERROR on its own - notifying environment") + } } + deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - t.sendStatus(t.knownEnvironmentId, pendingState, "") - }() - - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "task": t.ti.Name, - "level": infologger.IL_Devel, - }). - Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") - return nil + t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + } } func (t *ControllableTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { @@ -662,6 +547,13 @@ func (t *ControllableTask) Transition(cmd *executorcmd.ExecutorCommand_Transitio } func (t *ControllableTask) Kill() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + var ( pid = 0 reachedState = "UNKNOWN" // FIXME: should be LAUNCHING or similar @@ -670,11 +562,9 @@ func (t *ControllableTask) Kill() error { defer cancel() response, err := t.rpc.GetState(cxt, &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err == nil { // we successfully got the state from the task - log.WithField("nativeState", response.GetState()). - WithField("taskId", t.ti.GetTaskID()). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("nativeState", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried for upcoming soft kill") // NOTE: we acquire the transitioner-dependent STANDBY equivalent state @@ -717,14 +607,13 @@ func (t *ControllableTask) Kill() error { for reachedState != "DONE" { cmd := nextTransition(reachedState) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ - "evt": cmd.Event, - "src": cmd.Source, - "dst": cmd.Destination, - "targetList": cmd.TargetList, - "level": infologger.IL_Devel, + "evt": cmd.Event, + "src": cmd.Source, + "dst": cmd.Destination, + "targetList": cmd.TargetList, + infologger.Level: infologger.IL_Devel, }). Debug("state DONE not reached, about to commit transition") @@ -741,10 +630,8 @@ func (t *ControllableTask) Kill() error { select { case commitResponse = <-commitDone: case <-time.After(KILL_TRANSITION_TIMEOUT): - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence timed out") } // timeout we should break @@ -752,29 +639,23 @@ func (t *ControllableTask) Kill() error { break } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("newState", commitResponse.newState). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("transition committed") if commitResponse.transitionError != nil || len(cmd.Event) == 0 { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence error") break } reachedState = commitResponse.newState } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("teardown transition sequence done") pid = int(response.GetPid()) if pid == 0 { @@ -785,10 +666,8 @@ func (t *ControllableTask) Kill() error { // If GetState didn't succeed during this Kill code path, but might still have // at some earlier point during the lifetime of this task. // Either way, we might or might not have the true PID. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warn("cannot query task status for graceful process termination") pid = t.knownPid if pid == 0 { @@ -802,9 +681,8 @@ func (t *ControllableTask) Kill() error { // terminate the shell that is wrapping the command, so we avoid using // negative PID is all other cases in order to allow FairMQ cleanup to // run. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithError(err).WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). + WithError(err). Warn("task PID not known from task, using containing shell PGID") } } @@ -813,16 +691,12 @@ func (t *ControllableTask) Kill() error { t.rpc = nil if reachedState == "DONE" { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debugf("task reached DONE, will wait %.1fs before terminating it", DONE_TIMEOUT.Seconds()) t.pendingFinalTaskStateCh <- mesos.TASK_FINISHED time.Sleep(DONE_TIMEOUT) } else { // something went wrong - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debug("task died already or will be killed soon") t.pendingFinalTaskStateCh <- mesos.TASK_KILLED } @@ -830,25 +704,26 @@ func (t *ControllableTask) Kill() error { if pidExists(pid) { return t.doTermIntKill(pid) } else { - log.WithField("taskId", t.ti.GetTaskID()). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). Debugf("task terminated on its own") return nil } } func (t *ControllableTask) doKill9(pid int) error { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") killErr := syscall.Kill(pid, syscall.SIGKILL) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGKILL failed") } @@ -856,18 +731,21 @@ func (t *ControllableTask) doKill9(pid int) error { } func (t *ControllableTask) doTermIntKill(pid int) error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + killErrCh := make(chan error) go func() { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGTERM (15) to task") err := syscall.Kill(pid, syscall.SIGTERM) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGTERM failed") } killErrCh <- err @@ -882,16 +760,12 @@ func (t *ControllableTask) doTermIntKill(pid int) error { if pidExists(pid) { // SIGINT for the "Waiting for graceful device shutdown. // Hit Ctrl-C again to abort immediately" message. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGINT (2) to task") killErr = syscall.Kill(pid, syscall.SIGINT) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGINT failed") } time.Sleep(SIGINT_TIMEOUT)