Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 75 additions & 14 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -57,6 +58,8 @@ var (
clockIDRegEx = regexp.MustCompile(`\/dev\/ptp\d+`)
)

// Per-process guard is sufficient because each process owns a unique config.

// ProcessManager manages a set of ptpProcess
// which could be ptp4l, phc2sys or timemaster.
// Processes in ProcessManager will be started
Expand Down Expand Up @@ -150,6 +153,7 @@ type ptpProcess struct {
nodeProfile ptpv1.PtpProfile
parentClockClass float64
pmcCheck bool
clockClassRunning atomic.Bool
clockType event.ClockType
ptpClockThreshold *ptpv1.PtpClockThreshold
haProfile map[string][]string // stores list of interface name for each profile
Expand All @@ -170,6 +174,23 @@ func (p *ptpProcess) setStopped(val bool) {
p.execMutex.Unlock()
}

// TriggerPmcCheck sets pmcCheck to true in a thread-safe way
func (p *ptpProcess) TriggerPmcCheck() {
p.execMutex.Lock()
p.pmcCheck = true
p.execMutex.Unlock()
}

// ConsumePmcCheck atomically reads and resets the pmcCheck flag.
// It returns true if a PMC check should be performed.
func (p *ptpProcess) ConsumePmcCheck() bool {
p.execMutex.Lock()
val := p.pmcCheck
p.pmcCheck = false
p.execMutex.Unlock()
return val
}

// Daemon is the main structure for linuxptp instance.
// It contains all the necessary data to run linuxptp instance.
type Daemon struct {
Expand Down Expand Up @@ -694,7 +715,7 @@ func (dn *Daemon) GetPhaseOffsetPinFilter(nodeProfile *ptpv1.PtpProfile) map[str
func (dn *Daemon) HandlePmcTicker() {
for _, p := range dn.processManager.process {
if p.name == ptp4lProcessName {
p.pmcCheck = true
p.TriggerPmcCheck()
}
}
}
Expand Down Expand Up @@ -737,6 +758,12 @@ func processStatus(c *net.Conn, processName, messageTag string, status int64) {
}

func (p *ptpProcess) updateClockClass(c *net.Conn) {
// Per-process single-flight guard
if !p.clockClassRunning.CompareAndSwap(false, true) {
glog.Infof("clock class update already running for %s, skipping this run", p.configName)
return
}
defer p.clockClassRunning.Store(false)
defer func() {
if r := recover(); r != nil {
glog.Errorf("updateClockClass Recovered in f %#v", r)
Expand All @@ -760,7 +787,7 @@ func (p *ptpProcess) updateClockClass(c *net.Conn) {
// change to pint every minute or when the clock class changes
clockClassOut = fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %f\n", p.name, time.Now().Unix(), p.configName, p.parentClockClass)
if c == nil {
UpdateClockClassMetrics(clockClass) // no socket then update metrics
UpdateClockClassMetrics(p.configName, clockClass) // no socket then update metrics
} else {
_, err := (*c).Write([]byte(clockClassOut))
if err != nil {
Expand Down Expand Up @@ -850,12 +877,26 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool) {
d.ProcessStatus(p.c, PtpProcessUp)
}
}
// moving outside scanner loop to ensure clock class update routine
// even if process hangs
go func() {
for {
select {
case <-p.exitCh:
glog.Infof("Exiting pmcCheck%s...", p.name)
return
default:
if p.ConsumePmcCheck() {
p.updateClockClass(p.c)
}
//Add a small sleep to avoid tight CPU loop
time.Sleep(100 * time.Millisecond)
}
}
}()

for scanner.Scan() {
output := scanner.Text()
if p.pmcCheck {
p.pmcCheck = false
go p.updateClockClass(p.c)
}

if regexErr != nil || !logFilterRegex.MatchString(output) {
fmt.Printf("%s\n", output)
Expand Down Expand Up @@ -929,7 +970,7 @@ func (p *ptpProcess) processPTPMetrics(output string) {
logEntry := synce.ParseLog(output)
p.ProcessSynceEvents(logEntry)
} else {
configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output)
configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output, p.c == nil)
if iface != "" { // for ptp4l/phc2sys this function only update metrics
var values map[event.ValueType]interface{}
ifaceName := masterOffsetIface.getByAlias(configName, iface).name
Expand All @@ -950,6 +991,17 @@ func (p *ptpProcess) processPTPMetrics(output string) {
state = event.PTP_HOLDOVER // consider s1 state as holdover,this passed to event to create metrics and events
}
p.ProcessTs2PhcEvents(ptpOffset, source, ifaceName, state, values)
} else if clockState == HOLDOVER || clockState == LOCKED {
// in case of holdover without iface, still need to update clock class for T_G
if p.name != ts2phcProcessName && p.name != syncEProcessName { // TGM announce clock class via events
p.ConsumePmcCheck() // reset pmc check since we are updating clock class here
// on faulty port or recovery of slave port there might be a clock class change
go func() {
time.Sleep(50 * time.Millisecond)
p.updateClockClass(p.c)
glog.Infof("clock class updated %f", p.parentClockClass)
}()
}
}
}
}
Expand All @@ -961,6 +1013,9 @@ func (p *ptpProcess) cmdStop() {
return
}
p.setStopped(true)
// reset runtime flags
p.ConsumePmcCheck()
p.clockClassRunning.Store(false)
if p.cmd.Process != nil {
glog.Infof("Sending TERM to (%s) PID: %d", p.name, p.cmd.Process.Pid)
err := p.cmd.Process.Signal(syscall.SIGTERM)
Expand Down Expand Up @@ -1042,6 +1097,9 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface
if iface != "" && iface != clockRealTime {
iface = utils.GetAlias(iface)
}
if p.c != nil {
return // no metrics when socket is used
}
switch ptpState {
case event.PTP_LOCKED:
updateClockStateMetrics(p.name, iface, LOCKED)
Expand Down Expand Up @@ -1257,10 +1315,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
ExtendedSSM: 0,
})
state = sDeviceConfig.LastClockState
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM))

if p.c == nil { // only update metrics if no socket is used
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", logEntry.QL)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", synce.QL_DEFAULT_ENHSSM)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(logEntry.QL)+int(synce.QL_DEFAULT_ENHSSM))
}
} else if sDeviceConfig.ExtendedTlv == synce.ExtendedTLV_ENABLED {
var lastQLState *synce.QualityLevelInfo
var ok bool
Expand All @@ -1284,9 +1343,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
ExtendedSSM: lastQLState.ExtendedSSM,
Priority: 0,
})
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl))
if p.c == nil {
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "SSM", lastQLState.SSM)
UpdateSynceQLMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, "Extended SSM", logEntry.ExtQl)
UpdateSynceClockQlMetrics(syncEProcessName, p.configName, iface, sDeviceConfig.NetworkOption, sDeviceConfig.Name, int(lastQLState.SSM)+int(logEntry.ExtQl))
}

state = sDeviceConfig.LastClockState
} else if logEntry.QL != synce.QL_DEFAULT_SSM { //else we have only QL
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (tc *TestCase) cleanupMetrics() {
daemon.FrequencyAdjustment.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.Delay.With(map[string]string{"from": tc.from, "process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.ClockState.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node}).Set(CLEANUP)
daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node}).Set(CLEANUP)
daemon.InterfaceRole.With(map[string]string{"process": tc.process, "node": tc.node, "iface": tc.iface}).Set(CLEANUP)
}

Expand Down Expand Up @@ -267,7 +267,7 @@ func Test_ProcessPTPMetrics(t *testing.T) {
assert.Equal(tc.expectedClockState, testutil.ToFloat64(clockState), "ClockState does not match\n%s", tc.String())
}
if tc.expectedClockClassMetrics != SKIP {
clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "node": tc.node})
clockClassMetrics := daemon.ClockClassMetrics.With(map[string]string{"process": tc.process, "config": "ptp4l.0.config", "node": tc.node})
assert.Equal(tc.expectedClockClassMetrics, testutil.ToFloat64(clockClassMetrics), "ClockClassMetrics does not match\n%s", tc.String())
}
if tc.expectedInterfaceRole != SKIP {
Expand Down
24 changes: 15 additions & 9 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var (
Subsystem: PTPSubsystem,
Name: "clock_class",
Help: "6 = Locked, 7 = PRC unlocked in-spec, 52/187 = PRC unlocked out-of-spec, 135 = T-BC holdover in-spec, 165 = T-BC holdover out-of-spec, 248 = Default, 255 = Slave Only Clock",
}, []string{"process", "node"})
}, []string{"process", "node", "config"})

// InterfaceRole metrics to show current interface role
InterfaceRole = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -256,7 +256,7 @@ func updatePTPMetrics(from, process, iface string, ptpOffset, maxPtpOffset, freq
}

// extractMetrics ...
func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string) (configName, source string, offset float64, state string, iface string) {
func extractMetrics(messageTag string, processName string, ifaces config.IFaces, output string, updateMetrics bool) (configName, source string, offset float64, state string, iface string) {
configName = strings.Replace(strings.Replace(messageTag, "]", "", 1), "[", "", 1)
if configName != "" {
configName = strings.Split(configName, MessageTagSuffixSeperator)[0] // remove any suffix added to the configName
Expand Down Expand Up @@ -287,8 +287,10 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces,
if offsetSource == master {
masterOffsetSource.set(configName, processName)
}
updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay)
updateClockStateMetrics(processName, ifaceName, clockstate)
if updateMetrics {
updatePTPMetrics(offsetSource, processName, ifaceName, ptpOffset, maxPtpOffset, frequencyAdjustment, delay)
updateClockStateMetrics(processName, ifaceName, clockstate)
}
}
source = processName
offset = ptpOffset
Expand All @@ -302,14 +304,18 @@ func extractMetrics(messageTag string, processName string, ifaces config.IFaces,
if role == SLAVE {
masterOffsetIface.set(configName, ifaces[portId-1].Name)
slaveIface.set(configName, ifaces[portId-1].Name)
state = LOCKED // initial state to indicate we are locked when slave is back for clockclass to trigger
} else if role == FAULTY {
if slaveIface.isFaulty(configName, ifaces[portId-1].Name) &&
masterOffsetSource.get(configName) == ptp4lProcessName {
updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0)
updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0)
updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN)
if updateMetrics {
updatePTPMetrics(master, processName, masterOffsetIface.get(configName).alias, faultyOffset, faultyOffset, 0, 0)
updatePTPMetrics(phc, phc2sysProcessName, clockRealTime, faultyOffset, faultyOffset, 0, 0)
updateClockStateMetrics(processName, masterOffsetIface.get(configName).alias, FREERUN)
}
masterOffsetIface.set(configName, "")
slaveIface.set(configName, "")
state = HOLDOVER
}
}
}
Expand Down Expand Up @@ -528,9 +534,9 @@ func UpdateInterfaceRoleMetrics(process string, iface string, role ptpPortRole)
}

// UpdateClockClassMetrics ... update clock class metrics
func UpdateClockClassMetrics(clockClass float64) {
func UpdateClockClassMetrics(cfgName string, clockClass float64) {
ClockClassMetrics.With(prometheus.Labels{
"process": ptp4lProcessName, "node": NodeName}).Set(float64(clockClass))
"process": ptp4lProcessName, "config": cfgName, "node": NodeName}).Set(float64(clockClass))
}

func UpdateProcessStatusMetrics(process, cfgName string, status int64) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ func Init(nodeName string, stdOutToSocket bool, socketName string, processChanne
frequencyTraceable: false,
ReduceLog: true,
}
if clockClassMetric != nil {
clockClassMetric.With(prometheus.Labels{
"process": PTP4lProcessName, "node": nodeName}).Set(248)
}
StateRegisterer = NewStateNotifier()
return ptpEvent

Expand Down Expand Up @@ -683,7 +679,9 @@ connect:
if event.WriteToLog && logDataValues != "" {
logOut = append(logOut, logDataValues)
}
e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace)
if !e.stdoutToSocket {
e.UpdateClockStateMetrics(event.State, string(event.ProcessName), event.IFace)
}
} else {
// Update the in MemData
dataDetails := e.addEvent(event)
Expand Down Expand Up @@ -884,6 +882,9 @@ func (e *EventHandler) GetPTPState(source EventSource, cfgName string) PTPState

// UpdateClockStateMetrics ...
func (e *EventHandler) UpdateClockStateMetrics(state PTPState, process, iFace string) {
if e.stdoutToSocket {
return
}
labels := prometheus.Labels{
"process": process, "node": e.nodeName, "iface": iFace}
if state == PTP_LOCKED {
Expand Down Expand Up @@ -978,6 +979,9 @@ func registerMetrics(m *prometheus.GaugeVec) {
}

func (e *EventHandler) unregisterMetrics(configName string, processName string) {
if e.stdoutToSocket {
return // no need to unregister metrics if events are going to socket
}
if data, ok := e.data[configName]; ok {
for _, v := range data {
if string(v.ProcessName) == processName || processName == "" {
Expand Down Expand Up @@ -1046,7 +1050,7 @@ func (e *EventHandler) UpdateClockClass(c net.Conn, clk ClockClassRequest) {
}
} else {
e.clockClassMetric.With(prometheus.Labels{
"process": PTP4lProcessName, "node": e.nodeName}).Set(float64(clockClass))
"process": PTP4lProcessName, "node": e.nodeName, "config": clk.cfgName}).Set(float64(clockClass))
}
fmt.Printf("%s", clockClassOut)
}
Expand Down