@@ -12,6 +12,7 @@ import (
12
12
"strconv"
13
13
"strings"
14
14
"sync"
15
+ "sync/atomic"
15
16
"syscall"
16
17
"time"
17
18
57
58
clockIDRegEx = regexp .MustCompile (`\/dev\/ptp\d+` )
58
59
)
59
60
61
+ // Per-process guard is sufficient because each process owns a unique config.
62
+
60
63
// ProcessManager manages a set of ptpProcess
61
64
// which could be ptp4l, phc2sys or timemaster.
62
65
// Processes in ProcessManager will be started
@@ -150,6 +153,7 @@ type ptpProcess struct {
150
153
nodeProfile ptpv1.PtpProfile
151
154
parentClockClass float64
152
155
pmcCheck bool
156
+ clockClassRunning atomic.Bool
153
157
clockType event.ClockType
154
158
ptpClockThreshold * ptpv1.PtpClockThreshold
155
159
haProfile map [string ][]string // stores list of interface name for each profile
@@ -170,6 +174,23 @@ func (p *ptpProcess) setStopped(val bool) {
170
174
p .execMutex .Unlock ()
171
175
}
172
176
177
+ // TriggerPmcCheck sets pmcCheck to true in a thread-safe way
178
+ func (p * ptpProcess ) TriggerPmcCheck () {
179
+ p .execMutex .Lock ()
180
+ p .pmcCheck = true
181
+ p .execMutex .Unlock ()
182
+ }
183
+
184
+ // ConsumePmcCheck atomically reads and resets the pmcCheck flag.
185
+ // It returns true if a PMC check should be performed.
186
+ func (p * ptpProcess ) ConsumePmcCheck () bool {
187
+ p .execMutex .Lock ()
188
+ val := p .pmcCheck
189
+ p .pmcCheck = false
190
+ p .execMutex .Unlock ()
191
+ return val
192
+ }
193
+
173
194
// Daemon is the main structure for linuxptp instance.
174
195
// It contains all the necessary data to run linuxptp instance.
175
196
type Daemon struct {
@@ -694,7 +715,7 @@ func (dn *Daemon) GetPhaseOffsetPinFilter(nodeProfile *ptpv1.PtpProfile) map[str
694
715
func (dn * Daemon ) HandlePmcTicker () {
695
716
for _ , p := range dn .processManager .process {
696
717
if p .name == ptp4lProcessName {
697
- p .pmcCheck = true
718
+ p .TriggerPmcCheck ()
698
719
}
699
720
}
700
721
}
@@ -737,6 +758,12 @@ func processStatus(c *net.Conn, processName, messageTag string, status int64) {
737
758
}
738
759
739
760
func (p * ptpProcess ) updateClockClass (c * net.Conn ) {
761
+ // Per-process single-flight guard
762
+ if ! p .clockClassRunning .CompareAndSwap (false , true ) {
763
+ glog .Infof ("clock class update already running for %s, skipping this run" , p .configName )
764
+ return
765
+ }
766
+ defer p .clockClassRunning .Store (false )
740
767
defer func () {
741
768
if r := recover (); r != nil {
742
769
glog .Errorf ("updateClockClass Recovered in f %#v" , r )
@@ -760,7 +787,7 @@ func (p *ptpProcess) updateClockClass(c *net.Conn) {
760
787
// change to pint every minute or when the clock class changes
761
788
clockClassOut = fmt .Sprintf ("%s[%d]:[%s] CLOCK_CLASS_CHANGE %f\n " , p .name , time .Now ().Unix (), p .configName , p .parentClockClass )
762
789
if c == nil {
763
- UpdateClockClassMetrics (clockClass ) // no socket then update metrics
790
+ UpdateClockClassMetrics (p . configName , clockClass ) // no socket then update metrics
764
791
} else {
765
792
_ , err := (* c ).Write ([]byte (clockClassOut ))
766
793
if err != nil {
@@ -850,12 +877,26 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool) {
850
877
d .ProcessStatus (p .c , PtpProcessUp )
851
878
}
852
879
}
880
+ // moving outside scanner loop to ensure clock class update routine
881
+ // even if process hangs
882
+ go func () {
883
+ for {
884
+ select {
885
+ case <- p .exitCh :
886
+ glog .Infof ("Exiting pmcCheck%s..." , p .name )
887
+ return
888
+ default :
889
+ if p .ConsumePmcCheck () {
890
+ p .updateClockClass (p .c )
891
+ }
892
+ //Add a small sleep to avoid tight CPU loop
893
+ time .Sleep (100 * time .Millisecond )
894
+ }
895
+ }
896
+ }()
897
+
853
898
for scanner .Scan () {
854
899
output := scanner .Text ()
855
- if p .pmcCheck {
856
- p .pmcCheck = false
857
- go p .updateClockClass (p .c )
858
- }
859
900
860
901
if regexErr != nil || ! logFilterRegex .MatchString (output ) {
861
902
fmt .Printf ("%s\n " , output )
@@ -929,7 +970,7 @@ func (p *ptpProcess) processPTPMetrics(output string) {
929
970
logEntry := synce .ParseLog (output )
930
971
p .ProcessSynceEvents (logEntry )
931
972
} else {
932
- configName , source , ptpOffset , clockState , iface := extractMetrics (p .messageTag , p .name , p .ifaces , output )
973
+ configName , source , ptpOffset , clockState , iface := extractMetrics (p .messageTag , p .name , p .ifaces , output , p . c == nil )
933
974
if iface != "" { // for ptp4l/phc2sys this function only update metrics
934
975
var values map [event.ValueType ]interface {}
935
976
ifaceName := masterOffsetIface .getByAlias (configName , iface ).name
@@ -950,6 +991,17 @@ func (p *ptpProcess) processPTPMetrics(output string) {
950
991
state = event .PTP_HOLDOVER // consider s1 state as holdover,this passed to event to create metrics and events
951
992
}
952
993
p .ProcessTs2PhcEvents (ptpOffset , source , ifaceName , state , values )
994
+ } else if clockState == HOLDOVER || clockState == LOCKED {
995
+ // in case of holdover without iface, still need to update clock class for T_G
996
+ if p .name != ts2phcProcessName && p .name != syncEProcessName { // TGM announce clock class via events
997
+ p .ConsumePmcCheck () // reset pmc check since we are updating clock class here
998
+ // on faulty port or recovery of slave port there might be a clock class change
999
+ go func () {
1000
+ time .Sleep (50 * time .Millisecond )
1001
+ p .updateClockClass (p .c )
1002
+ glog .Infof ("clock class updated %f" , p .parentClockClass )
1003
+ }()
1004
+ }
953
1005
}
954
1006
}
955
1007
}
@@ -961,6 +1013,9 @@ func (p *ptpProcess) cmdStop() {
961
1013
return
962
1014
}
963
1015
p .setStopped (true )
1016
+ // reset runtime flags
1017
+ p .ConsumePmcCheck ()
1018
+ p .clockClassRunning .Store (false )
964
1019
if p .cmd .Process != nil {
965
1020
glog .Infof ("Sending TERM to (%s) PID: %d" , p .name , p .cmd .Process .Pid )
966
1021
err := p .cmd .Process .Signal (syscall .SIGTERM )
@@ -1042,6 +1097,9 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface
1042
1097
if iface != "" && iface != clockRealTime {
1043
1098
iface = utils .GetAlias (iface )
1044
1099
}
1100
+ if p .c != nil {
1101
+ return // no metrics when socket is used
1102
+ }
1045
1103
switch ptpState {
1046
1104
case event .PTP_LOCKED :
1047
1105
updateClockStateMetrics (p .name , iface , LOCKED )
@@ -1257,10 +1315,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
1257
1315
ExtendedSSM : 0 ,
1258
1316
})
1259
1317
state = sDeviceConfig .LastClockState
1260
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , logEntry .QL )
1261
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , synce .QL_DEFAULT_ENHSSM )
1262
- UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (logEntry .QL )+ int (synce .QL_DEFAULT_ENHSSM ))
1263
-
1318
+ if p .c == nil { // only update metrics if no socket is used
1319
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , logEntry .QL )
1320
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , synce .QL_DEFAULT_ENHSSM )
1321
+ UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (logEntry .QL )+ int (synce .QL_DEFAULT_ENHSSM ))
1322
+ }
1264
1323
} else if sDeviceConfig .ExtendedTlv == synce .ExtendedTLV_ENABLED {
1265
1324
var lastQLState * synce.QualityLevelInfo
1266
1325
var ok bool
@@ -1284,9 +1343,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
1284
1343
ExtendedSSM : lastQLState .ExtendedSSM ,
1285
1344
Priority : 0 ,
1286
1345
})
1287
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , lastQLState .SSM )
1288
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , logEntry .ExtQl )
1289
- UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (lastQLState .SSM )+ int (logEntry .ExtQl ))
1346
+ if p .c == nil {
1347
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , lastQLState .SSM )
1348
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , logEntry .ExtQl )
1349
+ UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (lastQLState .SSM )+ int (logEntry .ExtQl ))
1350
+ }
1290
1351
1291
1352
state = sDeviceConfig .LastClockState
1292
1353
} else if logEntry .QL != synce .QL_DEFAULT_SSM { //else we have only QL
0 commit comments