Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/instance/states_render.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/opensvc/om3/v3/core/colorstatus"
"github.com/opensvc/om3/v3/core/naming"
"github.com/opensvc/om3/v3/core/provisioned"
"github.com/opensvc/om3/v3/core/rawconfig"
"github.com/opensvc/om3/v3/core/resource"
Expand Down Expand Up @@ -46,7 +47,8 @@ func (t States) LoadTreeNodeFolded(head *tree.Node) {
func (t States) LoadTreeNode(head *tree.Node) {
head.AddColumn().AddText(t.Node.Name).SetColor(rawconfig.Color.Bold)
head.AddColumn()
if t.Config.ActorConfig == nil {
switch t.Path.Kind {
case naming.KindSec, naming.KindCfg, naming.KindCcfg, naming.KindUsr, naming.KindNscfg:
head.AddColumn()
head.AddColumn().AddText(t.descString())
return
Expand Down
33 changes: 18 additions & 15 deletions core/object/actor_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (t *actor) resourceStatusEval(ctx context.Context, data *instance.Status, m
var (
resourceStatus resource.Status
encapInstanceStatus *instance.EncapStatus
err error
)

if v, err := t.isEncapNodeMatchingResource(r); err != nil {
Expand All @@ -245,20 +244,24 @@ func (t *actor) resourceStatusEval(ctx context.Context, data *instance.Status, m
}

// If the resource is a encap capable container, evaluate the encap instance
if encapContainer, ok := r.(resource.Encaper); ok {
if resourceStatus.Status.Is(status.Up, status.StandbyUp) {
if encapInstanceStatus, err = t.resourceStatusEvalEncap(ctx, encapContainer, false); err != nil {
log := resource.NewStatusLog(resourceStatus.Log...)
log.Error("%s", err)
resourceStatus.Log = log.Entries()
}
} else {
encapInstanceStatus = &instance.EncapStatus{
Status: instance.Status{
Avail: status.Down,
Overall: status.Down,
},
Hostname: encapContainer.GetHostname(),
if encapNodes, err := t.EncapNodes(); err != nil {
return err
} else if len(encapNodes) > 0 {
if encapContainer, ok := r.(resource.Encaper); ok {
if resourceStatus.Status.Is(status.Up, status.StandbyUp) {
if encapInstanceStatus, err = t.resourceStatusEvalEncap(ctx, encapContainer, false); err != nil {
log := resource.NewStatusLog(resourceStatus.Log...)
log.Error("%s", err)
resourceStatus.Log = log.Entries()
}
} else {
encapInstanceStatus = &instance.EncapStatus{
Status: instance.Status{
Avail: status.Down,
Overall: status.Down,
},
Hostname: encapContainer.GetHostname(),
}
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,15 +1226,14 @@ func GetStatus(ctx context.Context, r Driver) Status {
// on containers it will set the initial inspect.
resStatus := EvalStatus(ctx, r)
return Status{
Label: formatResourceLabel(ctx, r),
Type: r.Manifest().DriverID.String(),
Status: resStatus,
Subset: r.RSubset(),
Tags: r.TagSet(),
Log: r.StatusLog().Entries(),
IsProvisioned: getProvisionStatus(ctx, r),
Info: getStatusInfo(ctx, r),
Files: getFiles(ctx, r),
Label: formatResourceLabel(ctx, r),
Type: r.Manifest().DriverID.String(),
Status: resStatus,
Subset: r.RSubset(),
Tags: r.TagSet(),
Log: r.StatusLog().Entries(),
Info: getStatusInfo(ctx, r),
Files: getFiles(ctx, r),

IsStopped: r.IsStopped(),
IsMonitored: r.IsMonitored(),
Expand All @@ -1243,6 +1242,9 @@ func GetStatus(ctx context.Context, r Driver) Status {
IsStandby: r.IsStandby(),
IsDisabled: r.IsDisabled(),
IsEncap: r.IsEncap(),

// keep last because all previous func calls can add entries
IsProvisioned: getProvisionStatus(ctx, r),
}
}

Expand Down
47 changes: 44 additions & 3 deletions daemon/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/opensvc/om3/v3/core/resourcereqs"
"github.com/opensvc/om3/v3/core/schedule"
"github.com/opensvc/om3/v3/core/status"
"github.com/opensvc/om3/v3/core/topology"
"github.com/opensvc/om3/v3/daemon/daemondata"
"github.com/opensvc/om3/v3/daemon/daemonsubsystem"
"github.com/opensvc/om3/v3/daemon/msgbus"
Expand All @@ -46,6 +47,7 @@ type (
jobs Jobs
enabled bool
provisioned map[naming.Path]bool
failover map[naming.Path]bool
schedules Schedules
isCollectorJoinable bool

Expand Down Expand Up @@ -148,6 +150,7 @@ func New(subQS pubsub.QueueSizer, opts ...funcopt.O) *T {
events: make(chan any),
jobs: make(Jobs),
schedules: make(Schedules),
failover: make(map[naming.Path]bool),
provisioned: make(map[naming.Path]bool),
subQS: subQS,
lastRunOnAllPeers: make(timeMap),
Expand Down Expand Up @@ -259,6 +262,29 @@ func (t Job) Cancel() {
t.cancel = nil
}

func (t *T) peerInstanceLastRun(e schedule.Entry) time.Time {
if e.Path.IsZero() {
return time.Time{}
}
if !t.isFailover(e.Path) {
return time.Time{}
}
if e.Config.Require == "" {
return time.Time{}
}
if strings.Contains(e.Config.Require, "down") {
return time.Time{}
}
if strings.Contains(e.Config.Require, "warn") {
return time.Time{}
}
lastRunOnAllPeers, ok := t.lastRunOnAllPeers.Get(e.Path, e.Key)
if !ok {
return time.Time{}
}
return lastRunOnAllPeers
}

func (t *T) createJob(e schedule.Entry) {
if !t.enabled {
return
Expand All @@ -277,9 +303,10 @@ func (t *T) createJob(e schedule.Entry) {
// after daemon start: initialize the schedule's LastRunAt from LastRunFile
e.LastRunAt = e.GetLastRun()
}
if lastRunOnAllPeers, ok := t.lastRunOnAllPeers.Get(e.Path, e.Key); ok && e.LastRunAt.Before(lastRunOnAllPeers) {
logger.Infof("adjust schedule entry last run time: %s => %s", e.LastRunAt, lastRunOnAllPeers)
e.LastRunAt = lastRunOnAllPeers

if tm := t.peerInstanceLastRun(e); e.LastRunAt.Before(tm) {
logger.Infof("adjust schedule entry last run time: %s => %s", e.LastRunAt, tm)
e.LastRunAt = tm
}

now := time.Now() // keep before GetNext call
Expand Down Expand Up @@ -324,6 +351,11 @@ func (t *T) jobLogger(e schedule.Entry) *plog.Logger {
return logger.WithPrefix(prefix)
}

func (t *T) isFailover(path naming.Path) bool {
isFailover, hasFailover := t.failover[path]
return hasFailover && isFailover
}

func (t *T) isProvisioned(path naming.Path) bool {
isProvisioned, hasProvisioned := t.provisioned[path]
return hasProvisioned && isProvisioned
Expand Down Expand Up @@ -356,6 +388,12 @@ func (t *T) onJobAlarm(c eventJobAlarm) {
}
}

if tm := t.peerInstanceLastRun(e); c.schedule.LastRunAt.Before(tm) {
logger.Infof("aborted, job ran on peer at %s", tm)
t.recreateJobFrom(e, tm)
return
}

// plan the next run before exec, so another exec can be done
// even if another is running
e.LastRunAt = c.schedule.LastRunAt
Expand Down Expand Up @@ -745,6 +783,8 @@ func (t *T) onDaemonCollectorUpdated(c *msgbus.DaemonCollectorUpdated) {
func (t *T) onObjectStatusDeleted(c *msgbus.ObjectStatusDeleted) {
t.lastRunOnAllPeers.UnsetPath(c.Path)
t.reqSatisfied.UnsetPath(c.Path)
delete(t.provisioned, c.Path)
delete(t.failover, c.Path)
}

func (t *T) onObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) {
Expand All @@ -761,6 +801,7 @@ func (t *T) onObjectStatusUpdated(c *msgbus.ObjectStatusUpdated) {
delete(t.provisioned, c.Path)
return
}
t.failover[c.Path] = c.Value.Topology == topology.Failover
isProvisioned := c.Value.Provisioned.IsOneOf(provisioned.True, provisioned.NotApplicable)
wasProvisioned, ok := t.provisioned[c.Path]
t.provisioned[c.Path] = isProvisioned
Expand Down