@@ -12,44 +12,46 @@ namespace Microsoft.Diagnostics.Tracing.EventPipe
12
12
13
13
internal class EventCache
14
14
{
15
+ public EventCache ( EventPipeEventSource source , ThreadCache threads )
16
+ {
17
+ _source = source ;
18
+ _threads = threads ;
19
+ }
20
+
15
21
public event ParseBufferItemFunction OnEvent ;
16
22
public event Action < int > OnEventsDropped ;
17
23
18
- public unsafe void ProcessEventBlock ( byte [ ] eventBlockData )
24
+ public void ProcessEventBlock ( byte [ ] eventBlockData , long streamOffset )
19
25
{
26
+ PinnedBuffer buffer = new PinnedBuffer ( eventBlockData ) ;
27
+ SpanReader reader = new SpanReader ( eventBlockData , streamOffset ) ;
28
+
20
29
// parse the header
21
- if ( eventBlockData . Length < 20 )
30
+ if ( eventBlockData . Length < 20 )
22
31
{
23
- Debug . Assert ( false , "Expected EventBlock of at least 20 bytes" ) ;
24
- return ;
32
+ throw new FormatException ( "Expected EventBlock of at least 20 bytes" ) ;
25
33
}
26
- ushort headerSize = BitConverter . ToUInt16 ( eventBlockData , 0 ) ;
34
+ ushort headerSize = reader . ReadUInt16 ( ) ;
27
35
if ( headerSize < 20 || headerSize > eventBlockData . Length )
28
36
{
29
- Debug . Assert ( false , "Invalid EventBlock header size" ) ;
30
- return ;
37
+ throw new FormatException ( "Invalid EventBlock header size" ) ;
31
38
}
32
- ushort flags = BitConverter . ToUInt16 ( eventBlockData , 2 ) ;
39
+ ushort flags = reader . ReadUInt16 ( ) ;
33
40
bool useHeaderCompression = ( flags & ( ushort ) EventBlockFlags . HeaderCompression ) != 0 ;
34
41
42
+ // skip the rest of the header
43
+ reader . ReadBytes ( headerSize - 4 ) ;
44
+
35
45
// parse the events
36
- PinnedBuffer buffer = new PinnedBuffer ( eventBlockData ) ;
37
- byte * cursor = ( byte * ) buffer . PinningHandle . AddrOfPinnedObject ( ) ;
38
- byte * end = cursor + eventBlockData . Length ;
39
- cursor += headerSize ;
40
46
EventMarker eventMarker = new EventMarker ( buffer ) ;
41
47
long timestamp = 0 ;
42
- EventPipeEventHeader . ReadFromFormatV4 ( cursor , useHeaderCompression , ref eventMarker . Header ) ;
43
- if ( ! _threads . TryGetValue ( eventMarker . Header . CaptureThreadId , out EventCacheThread thread ) )
44
- {
45
- thread = new EventCacheThread ( ) ;
46
- thread . SequenceNumber = eventMarker . Header . SequenceNumber - 1 ;
47
- AddThread ( eventMarker . Header . CaptureThreadId , thread ) ;
48
- }
48
+ SpanReader tempReader = reader ;
49
+ _source . ReadEventHeader ( ref tempReader , useHeaderCompression , ref eventMarker . Header ) ;
50
+ EventPipeThread thread = _threads . GetOrAddThread ( eventMarker . Header . CaptureThreadIndexOrId , eventMarker . Header . SequenceNumber - 1 ) ;
49
51
eventMarker = new EventMarker ( buffer ) ;
50
- while ( cursor < end )
52
+ while ( reader . RemainingBytes . Length > 0 )
51
53
{
52
- EventPipeEventHeader . ReadFromFormatV4 ( cursor , useHeaderCompression , ref eventMarker . Header ) ;
54
+ _source . ReadEventHeader ( ref reader , useHeaderCompression , ref eventMarker . Header ) ;
53
55
bool isSortedEvent = eventMarker . Header . IsSorted ;
54
56
timestamp = eventMarker . Header . TimeStamp ;
55
57
int sequenceNumber = eventMarker . Header . SequenceNumber ;
@@ -58,26 +60,10 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData)
58
60
thread . LastCachedEventTimestamp = timestamp ;
59
61
60
62
// sorted events are the only time the captureThreadId should change
61
- long captureThreadId = eventMarker . Header . CaptureThreadId ;
62
- if ( ! _threads . TryGetValue ( captureThreadId , out thread ) )
63
- {
64
- thread = new EventCacheThread ( ) ;
65
- thread . SequenceNumber = sequenceNumber - 1 ;
66
- AddThread ( captureThreadId , thread ) ;
67
- }
68
- }
69
-
70
- int droppedEvents = ( int ) Math . Min ( int . MaxValue , sequenceNumber - thread . SequenceNumber - 1 ) ;
71
- if ( droppedEvents > 0 )
72
- {
73
- OnEventsDropped ? . Invoke ( droppedEvents ) ;
74
- }
75
- else
76
- {
77
- // When a thread id is recycled the sequenceNumber can abruptly reset to 1 which
78
- // makes droppedEvents go negative
79
- Debug . Assert ( droppedEvents == 0 || sequenceNumber == 1 ) ;
63
+ long captureThreadId = eventMarker . Header . CaptureThreadIndexOrId ;
64
+ thread = _threads . GetOrAddThread ( captureThreadId , sequenceNumber - 1 ) ;
80
65
}
66
+ NotifyDroppedEventsIfNeeded ( thread . SequenceNumber , sequenceNumber - 1 ) ;
81
67
thread . SequenceNumber = sequenceNumber ;
82
68
83
69
if ( isSortedEvent )
@@ -91,81 +77,88 @@ public unsafe void ProcessEventBlock(byte[] eventBlockData)
91
77
92
78
}
93
79
94
- cursor += eventMarker . Header . TotalNonHeaderSize + eventMarker . Header . HeaderSize ;
80
+ reader . ReadBytes ( eventMarker . Header . PayloadSize ) ;
95
81
EventMarker lastEvent = eventMarker ;
96
82
eventMarker = new EventMarker ( buffer ) ;
97
83
eventMarker . Header = lastEvent . Header ;
98
84
}
99
85
thread . LastCachedEventTimestamp = timestamp ;
100
86
}
101
87
102
- public unsafe void ProcessSequencePointBlock ( byte [ ] sequencePointBytes )
88
+ public void ProcessSequencePointBlockV5OrLess ( byte [ ] sequencePointBytes , long streamOffset )
103
89
{
104
- const int SizeOfTimestampAndThreadCount = 12 ;
105
- const int SizeOfThreadIdAndSequenceNumber = 12 ;
106
- if ( sequencePointBytes . Length < SizeOfTimestampAndThreadCount )
90
+ SpanReader reader = new SpanReader ( sequencePointBytes , streamOffset ) ;
91
+ long timestamp = ( long ) reader . ReadUInt64 ( ) ;
92
+ int threadCount = ( int ) reader . ReadUInt32 ( ) ;
93
+ Flush ( timestamp ) ;
94
+ TrimEventsAfterSequencePoint ( ) ;
95
+ for ( int i = 0 ; i < threadCount ; i ++ )
107
96
{
108
- Debug . Assert ( false , "Bad sequence point block length" ) ;
109
- return ;
97
+ long captureThreadId = ( long ) reader . ReadUInt64 ( ) ;
98
+ int sequenceNumber = ( int ) reader . ReadUInt32 ( ) ;
99
+ CheckpointThread ( captureThreadId , sequenceNumber ) ;
110
100
}
111
- long timestamp = BitConverter . ToInt64 ( sequencePointBytes , 0 ) ;
112
- int threadCount = BitConverter . ToInt32 ( sequencePointBytes , 8 ) ;
113
- if ( sequencePointBytes . Length < SizeOfTimestampAndThreadCount + threadCount * SizeOfThreadIdAndSequenceNumber )
101
+ }
102
+
103
+ enum SequencePointFlags : uint
104
+ {
105
+ FlushThreads = 1
106
+ }
107
+
108
+ public void ProcessSequencePointBlockV6OrGreater ( byte [ ] sequencePointBytes , long streamOffset )
109
+ {
110
+ SpanReader reader = new SpanReader ( sequencePointBytes , streamOffset ) ;
111
+ long timestamp = ( long ) reader . ReadUInt64 ( ) ;
112
+ uint flags = reader . ReadUInt32 ( ) ;
113
+ int threadCount = ( int ) reader . ReadUInt32 ( ) ;
114
+ Flush ( timestamp ) ;
115
+ TrimEventsAfterSequencePoint ( ) ;
116
+ for ( int i = 0 ; i < threadCount ; i ++ )
114
117
{
115
- Debug . Assert ( false , "Bad sequence point block length" ) ;
116
- return ;
118
+ long captureThreadIndex = ( long ) reader . ReadVarUInt64 ( ) ;
119
+ int sequenceNumber = ( int ) reader . ReadVarUInt32 ( ) ;
120
+ CheckpointThread ( captureThreadIndex , sequenceNumber ) ;
117
121
}
122
+
123
+ if ( ( flags & ( uint ) SequencePointFlags . FlushThreads ) != 0 )
124
+ {
125
+ _threads . Flush ( ) ;
126
+ }
127
+ }
128
+
129
+ /// <summary>
130
+ /// After all events have been parsed we could have some straglers that weren't
131
+ /// earlier than any sorted event. Sort and dispatch those now.
132
+ /// </summary>
133
+ public void Flush ( ) => Flush ( long . MaxValue ) ;
134
+
135
+ private void Flush ( long timestamp )
136
+ {
118
137
SortAndDispatch ( timestamp ) ;
119
- foreach ( EventCacheThread thread in _threads . Values )
138
+ CheckForPendingThreadRemoval ( ) ;
139
+ }
140
+
141
+ private void TrimEventsAfterSequencePoint ( )
142
+ {
143
+ foreach ( EventPipeThread thread in _threads . Values )
120
144
{
121
145
Debug . Assert ( thread . Events . Count == 0 , "There shouldn't be any pending events after a sequence point" ) ;
122
146
thread . Events . Clear ( ) ;
123
147
thread . Events . TrimExcess ( ) ;
124
148
}
149
+ }
125
150
126
- int cursor = SizeOfTimestampAndThreadCount ;
127
- for ( int i = 0 ; i < threadCount ; i ++ )
151
+ private void CheckForPendingThreadRemoval ( )
152
+ {
153
+ foreach ( var thread in _threads . Values )
128
154
{
129
- long captureThreadId = BitConverter . ToInt64 ( sequencePointBytes , cursor ) ;
130
- int sequenceNumber = BitConverter . ToInt32 ( sequencePointBytes , cursor + 8 ) ;
131
- if ( ! _threads . TryGetValue ( captureThreadId , out EventCacheThread thread ) )
155
+ if ( thread . RemovalPending && thread . Events . Count == 0 )
132
156
{
133
- if ( sequenceNumber > 0 )
134
- {
135
- OnEventsDropped ? . Invoke ( sequenceNumber ) ;
136
- }
137
- thread = new EventCacheThread ( ) ;
138
- thread . SequenceNumber = sequenceNumber ;
139
- AddThread ( captureThreadId , thread ) ;
140
- }
141
- else
142
- {
143
- int droppedEvents = unchecked ( sequenceNumber - thread . SequenceNumber ) ;
144
- if ( droppedEvents > 0 )
145
- {
146
- OnEventsDropped ? . Invoke ( droppedEvents ) ;
147
- }
148
- else
149
- {
150
- // When a thread id is recycled the sequenceNumber can abruptly reset to 1 which
151
- // makes droppedEvents go negative
152
- Debug . Assert ( droppedEvents == 0 || sequenceNumber == 1 ) ;
153
- }
154
- thread . SequenceNumber = sequenceNumber ;
157
+ _threads . RemoveThread ( thread . ThreadId ) ;
155
158
}
156
- cursor += SizeOfThreadIdAndSequenceNumber ;
157
159
}
158
160
}
159
161
160
- /// <summary>
161
- /// After all events have been parsed we could have some straglers that weren't
162
- /// earlier than any sorted event. Sort and dispatch those now.
163
- /// </summary>
164
- public void Flush ( )
165
- {
166
- SortAndDispatch ( long . MaxValue ) ;
167
- }
168
-
169
162
private unsafe void SortAndDispatch ( long stopTimestamp )
170
163
{
171
164
// This sort could be made faster by using a min-heap but this is a simple place to start
@@ -203,7 +196,7 @@ private unsafe void SortAndDispatch(long stopTimestamp)
203
196
// from the cache or memory usage will grow unbounded. AddThread handles the
204
197
// the thread objects but the storage for the queue elements also does not shrink
205
198
// below the high water mark unless we free it explicitly.
206
- foreach ( Queue < EventMarker > q in threadQueues )
199
+ foreach ( Queue < EventMarker > q in threadQueues )
207
200
{
208
201
if ( q . Count == 0 )
209
202
{
@@ -212,45 +205,46 @@ private unsafe void SortAndDispatch(long stopTimestamp)
212
205
}
213
206
}
214
207
215
- private void AddThread ( long captureThreadId , EventCacheThread thread )
208
+ public void CheckpointThreadAndPendRemoval ( long threadIndex , int sequenceNumber )
209
+ {
210
+ EventPipeThread thread = _threads . GetThread ( threadIndex ) ;
211
+ CheckpointThread ( thread , sequenceNumber ) ;
212
+ thread . RemovalPending = true ;
213
+ }
214
+
215
+ public void CheckpointThread ( long threadIndex , int sequenceNumber )
216
+ {
217
+ EventPipeThread thread = _threads . GetOrAddThread ( threadIndex , sequenceNumber ) ;
218
+ CheckpointThread ( thread , sequenceNumber ) ;
219
+ }
220
+
221
+ private void CheckpointThread ( EventPipeThread thread , int sequenceNumber )
222
+ {
223
+ NotifyDroppedEventsIfNeeded ( thread . SequenceNumber , sequenceNumber ) ;
224
+ thread . SequenceNumber = sequenceNumber ;
225
+ }
226
+
227
+ private void NotifyDroppedEventsIfNeeded ( int sequenceNumber , int expectedSequenceNumber )
216
228
{
217
- // To ensure we don't have unbounded growth we evict old threads to make room
218
- // for new ones. Evicted threads can always be re-added later if they log again
219
- // but there are two consequences:
220
- // a) We won't detect lost events on that thread after eviction
221
- // b) If the thread still had events pending dispatch they will be lost
222
- // We pick the thread that has gone the longest since it last logged an event
223
- // under the presumption that it is probably dead, has no events, and won't
224
- // log again.
225
- //
226
- // In the future if we had explicit thread death notification events we could keep
227
- // this cache leaner.
228
- if ( _threads . Count >= 5000 )
229
+ // Either events were dropped or the sequence number was reset because the thread ID was recycled.
230
+ // V6 format never recycles thread indexes but prior formats do. We assume heuristically that if an event or sequence
231
+ // point implies the last sequenceNumber was zero then the thread was recycled.
232
+ if ( _source . FileFormatVersionNumber >= 6 || sequenceNumber != 0 )
229
233
{
230
- long oldestThreadCaptureId = - 1 ;
231
- long smallestTimestamp = long . MaxValue ;
232
- foreach ( var kv in _threads )
234
+ int droppedEvents = unchecked ( expectedSequenceNumber - sequenceNumber ) ;
235
+ if ( droppedEvents < 0 )
233
236
{
234
- if ( kv . Value . LastCachedEventTimestamp < smallestTimestamp )
235
- {
236
- smallestTimestamp = kv . Value . LastCachedEventTimestamp ;
237
- oldestThreadCaptureId = kv . Key ;
238
- }
237
+ droppedEvents = int . MaxValue ;
238
+ }
239
+ if ( droppedEvents > 0 )
240
+ {
241
+ OnEventsDropped ? . Invoke ( droppedEvents ) ;
239
242
}
240
- Debug . Assert ( oldestThreadCaptureId != - 1 ) ;
241
- _threads . Remove ( oldestThreadCaptureId ) ;
242
243
}
243
- _threads [ captureThreadId ] = thread ;
244
244
}
245
245
246
- Dictionary < long , EventCacheThread > _threads = new Dictionary < long , EventCacheThread > ( ) ;
247
- }
248
-
249
- internal class EventCacheThread
250
- {
251
- public Queue < EventMarker > Events = new Queue < EventMarker > ( ) ;
252
- public int SequenceNumber ;
253
- public long LastCachedEventTimestamp ;
246
+ EventPipeEventSource _source ;
247
+ ThreadCache _threads ;
254
248
}
255
249
256
250
internal class EventMarker
0 commit comments