@@ -12,6 +12,7 @@ import (
12
12
"strconv"
13
13
"strings"
14
14
"sync"
15
+ "sync/atomic"
15
16
"syscall"
16
17
"time"
17
18
@@ -150,6 +151,7 @@ type ptpProcess struct {
150
151
nodeProfile ptpv1.PtpProfile
151
152
parentClockClass float64
152
153
pmcCheck bool
154
+ clockClassRunning int32 // 0 = not running, 1 = running
153
155
clockType event.ClockType
154
156
ptpClockThreshold * ptpv1.PtpClockThreshold
155
157
haProfile map [string ][]string // stores list of interface name for each profile
@@ -170,6 +172,23 @@ func (p *ptpProcess) setStopped(val bool) {
170
172
p .execMutex .Unlock ()
171
173
}
172
174
175
+ // SetPmcCheck updates the pmcCheck flag in a thread-safe way
176
+ func (p * ptpProcess ) SetPmcCheck (val bool ) {
177
+ p .execMutex .Lock ()
178
+ p .pmcCheck = val
179
+ p .execMutex .Unlock ()
180
+ }
181
+
182
+ // ConsumePmcCheck atomically reads and resets the pmcCheck flag.
183
+ // It returns true if a PMC check should be performed.
184
+ func (p * ptpProcess ) ConsumePmcCheck () bool {
185
+ p .execMutex .Lock ()
186
+ val := p .pmcCheck
187
+ p .pmcCheck = false
188
+ p .execMutex .Unlock ()
189
+ return val
190
+ }
191
+
173
192
// Daemon is the main structure for linuxptp instance.
174
193
// It contains all the necessary data to run linuxptp instance.
175
194
type Daemon struct {
@@ -694,7 +713,7 @@ func (dn *Daemon) GetPhaseOffsetPinFilter(nodeProfile *ptpv1.PtpProfile) map[str
694
713
func (dn * Daemon ) HandlePmcTicker () {
695
714
for _ , p := range dn .processManager .process {
696
715
if p .name == ptp4lProcessName {
697
- p .pmcCheck = true
716
+ p .SetPmcCheck ( true )
698
717
}
699
718
}
700
719
}
@@ -737,6 +756,12 @@ func processStatus(c *net.Conn, processName, messageTag string, status int64) {
737
756
}
738
757
739
758
func (p * ptpProcess ) updateClockClass (c * net.Conn ) {
759
+ // Single-flight guard to prevent concurrent executions per ptpProcess
760
+ if ! atomic .CompareAndSwapInt32 (& p .clockClassRunning , 0 , 1 ) {
761
+ glog .Infof ("clock class update already running, skipping this run" )
762
+ return
763
+ }
764
+ defer atomic .StoreInt32 (& p .clockClassRunning , 0 )
740
765
defer func () {
741
766
if r := recover (); r != nil {
742
767
glog .Errorf ("updateClockClass Recovered in f %#v" , r )
@@ -760,7 +785,7 @@ func (p *ptpProcess) updateClockClass(c *net.Conn) {
760
785
// change to pint every minute or when the clock class changes
761
786
clockClassOut = fmt .Sprintf ("%s[%d]:[%s] CLOCK_CLASS_CHANGE %f\n " , p .name , time .Now ().Unix (), p .configName , p .parentClockClass )
762
787
if c == nil {
763
- UpdateClockClassMetrics (clockClass ) // no socket then update metrics
788
+ UpdateClockClassMetrics (p . configName , clockClass ) // no socket then update metrics
764
789
} else {
765
790
_ , err := (* c ).Write ([]byte (clockClassOut ))
766
791
if err != nil {
@@ -850,12 +875,26 @@ func (p *ptpProcess) cmdRun(stdoutToSocket bool) {
850
875
d .ProcessStatus (p .c , PtpProcessUp )
851
876
}
852
877
}
878
+ // moving outside scanner loop to ensure clock class update routine
879
+ // even if process hangs
880
+ go func () {
881
+ for {
882
+ select {
883
+ case <- p .exitCh :
884
+ glog .Infof ("Exiting pmcCheck%s..." , p .name )
885
+ return
886
+ default :
887
+ if p .ConsumePmcCheck () {
888
+ p .updateClockClass (p .c )
889
+ }
890
+ //Add a small sleep to avoid tight CPU loop
891
+ time .Sleep (100 * time .Millisecond )
892
+ }
893
+ }
894
+ }()
895
+
853
896
for scanner .Scan () {
854
897
output := scanner .Text ()
855
- if p .pmcCheck {
856
- p .pmcCheck = false
857
- go p .updateClockClass (p .c )
858
- }
859
898
860
899
if regexErr != nil || ! logFilterRegex .MatchString (output ) {
861
900
fmt .Printf ("%s\n " , output )
@@ -929,7 +968,7 @@ func (p *ptpProcess) processPTPMetrics(output string) {
929
968
logEntry := synce .ParseLog (output )
930
969
p .ProcessSynceEvents (logEntry )
931
970
} else {
932
- configName , source , ptpOffset , clockState , iface := extractMetrics (p .messageTag , p .name , p .ifaces , output )
971
+ configName , source , ptpOffset , clockState , iface := extractMetrics (p .messageTag , p .name , p .ifaces , output , p . c == nil )
933
972
if iface != "" { // for ptp4l/phc2sys this function only update metrics
934
973
var values map [event.ValueType ]interface {}
935
974
ifaceName := masterOffsetIface .getByAlias (configName , iface ).name
@@ -950,6 +989,17 @@ func (p *ptpProcess) processPTPMetrics(output string) {
950
989
state = event .PTP_HOLDOVER // consider s1 state as holdover,this passed to event to create metrics and events
951
990
}
952
991
p .ProcessTs2PhcEvents (ptpOffset , source , ifaceName , state , values )
992
+ } else if clockState == HOLDOVER || clockState == LOCKED {
993
+ // in case of holdover without iface, still need to update clock class for T_G
994
+ if p .name != ts2phcProcessName && p .name != syncEProcessName { // TGM announce clock class via events
995
+ p .SetPmcCheck (false ) // reset pmc check since we are updating clock class here
996
+ // on faulty port or recovery of slave port there might be a clock class change
997
+ go func () {
998
+ time .Sleep (50 * time .Millisecond )
999
+ p .updateClockClass (p .c )
1000
+ glog .Infof ("clock class updated %f" , p .parentClockClass )
1001
+ }()
1002
+ }
953
1003
}
954
1004
}
955
1005
}
@@ -961,6 +1011,9 @@ func (p *ptpProcess) cmdStop() {
961
1011
return
962
1012
}
963
1013
p .setStopped (true )
1014
+ // reset runtime flags
1015
+ p .SetPmcCheck (false )
1016
+ atomic .StoreInt32 (& p .clockClassRunning , 0 )
964
1017
if p .cmd .Process != nil {
965
1018
glog .Infof ("Sending TERM to (%s) PID: %d" , p .name , p .cmd .Process .Pid )
966
1019
err := p .cmd .Process .Signal (syscall .SIGTERM )
@@ -1042,6 +1095,9 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface
1042
1095
if iface != "" && iface != clockRealTime {
1043
1096
iface = utils .GetAlias (iface )
1044
1097
}
1098
+ if p .c != nil {
1099
+ return // no metrics when socket is used
1100
+ }
1045
1101
switch ptpState {
1046
1102
case event .PTP_LOCKED :
1047
1103
updateClockStateMetrics (p .name , iface , LOCKED )
@@ -1257,10 +1313,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
1257
1313
ExtendedSSM : 0 ,
1258
1314
})
1259
1315
state = sDeviceConfig .LastClockState
1260
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , logEntry .QL )
1261
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , synce .QL_DEFAULT_ENHSSM )
1262
- UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (logEntry .QL )+ int (synce .QL_DEFAULT_ENHSSM ))
1263
-
1316
+ if p .c == nil { // only update metrics if no socket is used
1317
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , logEntry .QL )
1318
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , synce .QL_DEFAULT_ENHSSM )
1319
+ UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (logEntry .QL )+ int (synce .QL_DEFAULT_ENHSSM ))
1320
+ }
1264
1321
} else if sDeviceConfig .ExtendedTlv == synce .ExtendedTLV_ENABLED {
1265
1322
var lastQLState * synce.QualityLevelInfo
1266
1323
var ok bool
@@ -1284,9 +1341,11 @@ func (p *ptpProcess) ProcessSynceEvents(logEntry synce.LogEntry) {
1284
1341
ExtendedSSM : lastQLState .ExtendedSSM ,
1285
1342
Priority : 0 ,
1286
1343
})
1287
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , lastQLState .SSM )
1288
- UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , logEntry .ExtQl )
1289
- UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (lastQLState .SSM )+ int (logEntry .ExtQl ))
1344
+ if p .c == nil {
1345
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "SSM" , lastQLState .SSM )
1346
+ UpdateSynceQLMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , "Extended SSM" , logEntry .ExtQl )
1347
+ UpdateSynceClockQlMetrics (syncEProcessName , p .configName , iface , sDeviceConfig .NetworkOption , sDeviceConfig .Name , int (lastQLState .SSM )+ int (logEntry .ExtQl ))
1348
+ }
1290
1349
1291
1350
state = sDeviceConfig .LastClockState
1292
1351
} else if logEntry .QL != synce .QL_DEFAULT_SSM { //else we have only QL
0 commit comments