diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 849c633f..0eaf61a8 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -57,6 +58,8 @@ var ( clockIDRegEx = regexp.MustCompile(`\/dev\/ptp\d+`) ) +// Per-process guard is sufficient because each process owns a unique config. + // ProcessManager manages a set of ptpProcess // which could be ptp4l, phc2sys or timemaster. // Processes in ProcessManager will be started @@ -150,6 +153,7 @@ type ptpProcess struct { nodeProfile ptpv1.PtpProfile parentClockClass float64 pmcCheck bool + clockClassRunning atomic.Bool clockType event.ClockType ptpClockThreshold *ptpv1.PtpClockThreshold haProfile map[string][]string // stores list of interface name for each profile @@ -170,6 +174,23 @@ func (p *ptpProcess) setStopped(val bool) { p.execMutex.Unlock() } +// TriggerPmcCheck sets pmcCheck to true in a thread-safe way +func (p *ptpProcess) TriggerPmcCheck() { + p.execMutex.Lock() + p.pmcCheck = true + p.execMutex.Unlock() +} + +// ConsumePmcCheck atomically reads and resets the pmcCheck flag. +// It returns true if a PMC check should be performed. +func (p *ptpProcess) ConsumePmcCheck() bool { + p.execMutex.Lock() + val := p.pmcCheck + p.pmcCheck = false + p.execMutex.Unlock() + return val +} + // Daemon is the main structure for linuxptp instance. // It contains all the necessary data to run linuxptp instance. type Daemon struct { @@ -694,7 +715,7 @@ func (dn *Daemon) GetPhaseOffsetPinFilter(nodeProfile *ptpv1.PtpProfile) map[str func (dn *Daemon) HandlePmcTicker() { for _, p := range dn.processManager.process { if p.name == ptp4lProcessName { - p.pmcCheck = true + p.TriggerPmcCheck() } } } @@ -737,6 +758,12 @@ func processStatus(c *net.Conn, processName, messageTag string, status int64) { } func (p *ptpProcess) updateClockClass(c *net.Conn) { + // Per-process single-flight guard + if !p.clockClassRunning.CompareAndSwap(false, true) { + glog.Infof("clock class update already running for %s, skipping this run", p.configName) + return + } + defer p.clockClassRunning.Store(false) defer func() { if r := recover(); r != nil { glog.Errorf("updateClockClass Recovered in f %#v", r) @@ -760,7 +787,7 @@ func (p *ptpProcess) updateClockClass(c *net.Conn) { // change to pint every minute or when the clock class changes clockClassOut = fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %f\n", p.name, time.Now().Unix(), p.configName, p.parentClockClass) if c == nil { - UpdateClockClassMetrics(clockClass) // no socket then update metrics + UpdateClockClassMetrics(p.configName, clockClass) // no socket then update metrics } else { _, err := (*c).Write([]byte(clockClassOut)) if err != nil { @@ -850,12 +877,26 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool) { d.ProcessStatus(p.c, PtpProcessUp) } } + // moving outside scanner loop to ensure clock class update routine + // even if process hangs + go func() { + for { + select { + case <-p.exitCh: + glog.Infof("Exiting pmcCheck%s...", p.name) + return + default: + if p.ConsumePmcCheck() { + p.updateClockClass(p.c) + } + //Add a small sleep to avoid tight CPU loop + time.Sleep(100 * time.Millisecond) + } + } + }() + for scanner.Scan() { output := scanner.Text() - if p.pmcCheck { - p.pmcCheck = false - go p.updateClockClass(p.c) - } if regexErr != nil || !logFilterRegex.MatchString(output) { fmt.Printf("%s\n", output) @@ -929,7 +970,7 @@ func (p *ptpProcess) processPTPMetrics(output string) { logEntry := synce.ParseLog(output) p.ProcessSynceEvents(logEntry) } else { - configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output) + configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output, p.c == nil) if iface != "" { // for ptp4l/phc2sys this function only update metrics var values map[event.ValueType]interface{} ifaceName := masterOffsetIface.getByAlias(configName, iface).name @@ -950,6 +991,17 @@ func (p *ptpProcess) processPTPMetrics(output string) { state = event.PTP_HOLDOVER // consider s1 state as holdover,this passed to event to create metrics and events } p.ProcessTs2PhcEvents(ptpOffset, source, ifaceName, state, values) + } else if clockState == HOLDOVER || clockState == LOCKED { + // in case of holdover without iface, still need to update clock class for T_G + if p.name != ts2phcProcessName && p.name != syncEProcessName { // TGM announce clock class via events + p.ConsumePmcCheck() // reset pmc check since we are updating clock class here + // on faulty port or recovery of slave port there might be a clock class change + go func() { + time.Sleep(50 * time.Millisecond) + p.updateClockClass(p.c) + glog.Infof("clock class updated %f", p.parentClockClass) + }() + } } } } @@ -961,6 +1013,9 @@ func (p *ptpProcess) cmdStop() { return } p.setStopped(true) + // reset runtime flags + p.ConsumePmcCheck() + p.clockClassRunning.Store(false) if p.cmd.Process != nil { glog.Infof("Sending TERM to (%s) PID: %d", p.name, p.cmd.Process.Pid) err := p.cmd.Process.Signal(syscall.SIGTERM) @@ -1042,6 +1097,9 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface if iface != "" && iface != clockRealTime { iface = utils.GetAlias(iface) } + if p.c != nil { + return // no metrics when socket is used + } switch ptpState { case event.PTP_LOCKED: updateClockStateMetrics(p.name, iface, LOCKED) @@ -1257,10 +1315,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) { ExtendedSSM: 0, }) state = sDeviceConfig.LastClockState - UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL) - UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM) - UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM)) - + if p.c == nil { // only update metrics if no socket is used + UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL) + UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM) + UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM)) + } } else if sDeviceConfig.ExtendedTlv == synce.ExtendedTLV_ENABLED { var lastQLState *synce.QualityLevelInfo var ok bool @@ -1284,9 +1343,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) { ExtendedSSM: lastQLState.ExtendedSSM, Priority: 0, }) - UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM) - UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl) - UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl)) + if p.c == nil { + UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM) + UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl) + UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl)) + } state = sDeviceConfig.LastClockState } else if logEntry.QL != synce.QL_DEFAULT_SSM { //else we have only QL diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index 36658f4e..5d3d94e5 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -83,7 +83,7 @@ func (tc *TestCase) cleanupMetrics() { daemon.FrequencyAdjustment.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP) daemon.Delay.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP) daemon.ClockState.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP) - daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node}).Set(CLEANUP) + daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node}).Set(CLEANUP) daemon.InterfaceRole.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP) } @@ -267,7 +267,7 @@ func Test_ProcessPTPMetrics(t *testing.T) { assert.Equal(tc.expectedClockState, testutil.ToFloat64(clockState), "ClockState does not match\n%s", tc.String()) } if tc.expectedClockClassMetrics != SKIP { - clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node}) + clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node}) assert.Equal(tc.expectedClockClassMetrics, testutil.ToFloat64(clockClassMetrics), "ClockClassMetrics does not match\n%s", tc.String()) } if tc.expectedInterfaceRole != SKIP { diff --git a/pkg/daemon/metrics.go b/pkg/daemon/metrics.go index 45827155..247fc08c 100644 --- a/pkg/daemon/metrics.go +++ b/pkg/daemon/metrics.go @@ -142,7 +142,7 @@ var ( Subsystem: PTPSubsystem, Name: "clock_class", Help: "6 = Locked, 7 = PRC unlocked in-spec, 52/187 = PRC unlocked out-of-spec, 135 = T-BC holdover in-spec, 165 = T-BC holdover out-of-spec, 248 = Default, 255 = Slave Only Clock", - }, []string{"process", "node"}) + }, []string{"process", "node", "config"}) // InterfaceRole metrics to show current interface role InterfaceRole = prometheus.NewGaugeVec( @@ -256,7 +256,7 @@ func updatePTPMetrics(from, process, iface string, ptpOffset, maxPtpOffset, freq } // extractMetrics ... -func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string) (configName, source string, offset float64, state string, iface string) { +func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string, updateMetrics bool) (configName, source string, offset float64, state string, iface string) { configName = strings.Replace(strings.Replace(messageTag, "]", "", 1), "[", "", 1) if configName != "" { configName = strings.Split(configName, MessageTagSuffixSeperator)[0] // remove any suffix added to the configName @@ -287,8 +287,10 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces, if offsetSource == master { masterOffsetSource.set(configName, processName) } - updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay) - updateClockStateMetrics(processName, ifaceName, clockstate) + if updateMetrics { + updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay) + updateClockStateMetrics(processName, ifaceName, clockstate) + } } source = processName offset = ptpOffset @@ -302,14 +304,18 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces, if role == SLAVE { masterOffsetIface.set(configName, ifaces[portId-1].Name) slaveIface.set(configName, ifaces[portId-1].Name) + state = LOCKED // initial state to indicate we are locked when slave is back for clockclass to trigger } else if role == FAULTY { if slaveIface.isFaulty(configName, ifaces[portId-1].Name) && masterOffsetSource.get(configName) == ptp4lProcessName { - updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0) - updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0) - updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN) + if updateMetrics { + updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0) + updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0) + updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN) + } masterOffsetIface.set(configName, "") slaveIface.set(configName, "") + state = HOLDOVER } } } @@ -528,9 +534,9 @@ func UpdateInterfaceRoleMetrics(process string, iface string, role ptpPortRole) } // UpdateClockClassMetrics ... update clock class metrics -func UpdateClockClassMetrics(clockClass float64) { +func UpdateClockClassMetrics(cfgName string, clockClass float64) { ClockClassMetrics.With(prometheus.Labels{ - "process": ptp4lProcessName, "node": NodeName}).Set(float64(clockClass)) + "process": ptp4lProcessName, "config": cfgName, "node": NodeName}).Set(float64(clockClass)) } func UpdateProcessStatusMetrics(process, cfgName string, status int64) { diff --git a/pkg/event/event.go b/pkg/event/event.go index 8390e3f9..9c797dc3 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -216,10 +216,6 @@ func Init(nodeName string, stdOutToSocket bool, socketName string, processChanne frequencyTraceable: false, ReduceLog: true, } - if clockClassMetric != nil { - clockClassMetric.With(prometheus.Labels{ - "process": PTP4lProcessName, "node": nodeName}).Set(248) - } StateRegisterer = NewStateNotifier() return ptpEvent @@ -683,7 +679,9 @@ connect: if event.WriteToLog && logDataValues != "" { logOut = append(logOut, logDataValues) } - e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace) + if !e.stdoutToSocket { + e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace) + } } else { // Update the in MemData dataDetails := e.addEvent(event) @@ -884,6 +882,9 @@ func (e *EventHandler) GetPTPState(source EventSource, cfgName string) PTPState // UpdateClockStateMetrics ... func (e *EventHandler) UpdateClockStateMetrics(state PTPState, process, iFace string) { + if e.stdoutToSocket { + return + } labels := prometheus.Labels{ "process": process, "node": e.nodeName, "iface": iFace} if state == PTP_LOCKED { @@ -978,6 +979,9 @@ func registerMetrics(m *prometheus.GaugeVec) { } func (e *EventHandler) unregisterMetrics(configName string, processName string) { + if e.stdoutToSocket { + return // no need to unregister metrics if events are going to socket + } if data, ok := e.data[configName]; ok { for _, v := range data { if string(v.ProcessName) == processName || processName == "" { @@ -1046,7 +1050,7 @@ func (e *EventHandler) UpdateClockClass(c net.Conn, clk ClockClassRequest) { } } else { e.clockClassMetric.With(prometheus.Labels{ - "process": PTP4lProcessName, "node": e.nodeName}).Set(float64(clockClass)) + "process": PTP4lProcessName, "node": e.nodeName, "config": clk.cfgName}).Set(float64(clockClass)) } fmt.Printf("%s", clockClassOut) }