77
77
policy , operator_policy , effective_policy_definition , type , memory ,
78
78
consumers , segments ]).
79
79
80
+ -define (UNMATCHED_THRESHOLD , 200 ).
81
+
80
82
-type appender_seq () :: non_neg_integer ().
81
83
82
84
-type msg () :: term (). % % TODO: refine
83
85
84
86
-record (stream , {mode :: rabbit_queue_type :consume_mode (),
85
87
delivery_count :: none | rabbit_queue_type :delivery_count (),
86
88
credit :: rabbit_queue_type :credit (),
89
+ drain = false :: boolean (),
90
+ credit_reply_outstanding = false :: boolean (),
87
91
ack :: boolean (),
88
92
start_offset = 0 :: non_neg_integer (),
89
93
listening_offset = 0 :: non_neg_integer (),
95
99
% % reversed order until the consumer has more credits to consume them.
96
100
buffer_msgs_rev = [] :: [rabbit_amqqueue :qmsg ()],
97
101
filter :: rabbit_amqp_filter :expression (),
102
+ % % Number of consecutive messages for which the filter evaluated to false
103
+ unmatched = 0 :: non_neg_integer (),
104
+ filtering_paused = false :: boolean (),
98
105
reader_options :: map ()}).
99
106
100
107
-record (stream_client , {stream_id :: string (),
@@ -513,39 +520,22 @@ credit_v1(_, _, _, _, _) ->
513
520
credit (QName , CTag , DeliveryCountRcv , LinkCreditRcv , Drain ,
514
521
# stream_client {readers = Readers ,
515
522
name = Name ,
516
- local_pid = LocalPid } = State0 ) ->
523
+ local_pid = LocalPid } = State ) ->
517
524
case Readers of
518
525
#{CTag := Str0 = # stream {delivery_count = DeliveryCountSnd }} ->
519
526
LinkCreditSnd = amqp10_util :link_credit_snd (
520
527
DeliveryCountRcv , LinkCreditRcv , DeliveryCountSnd ),
521
- Str1 = Str0 # stream {credit = LinkCreditSnd },
522
- {Str2 = # stream {delivery_count = DeliveryCount ,
523
- credit = Credit ,
524
- ack = Ack }, Msgs } = stream_entries (QName , Name , LocalPid , Str1 ),
525
- Str = case Drain andalso Credit > 0 of
526
- true ->
527
- Str2 # stream {delivery_count = serial_number :add (DeliveryCount , Credit ),
528
- credit = 0 };
529
- false ->
530
- Str2
531
- end ,
532
- State = State0 # stream_client {readers = maps :update (CTag , Str , Readers )},
533
- Actions = deliver_actions (CTag , Ack , Msgs ) ++ [{credit_reply ,
534
- CTag ,
535
- Str # stream .delivery_count ,
536
- Str # stream .credit ,
537
- available_messages (Str ),
538
- Drain }],
539
- {State , Actions };
528
+ Str1 = Str0 # stream {credit = LinkCreditSnd ,
529
+ drain = Drain ,
530
+ credit_reply_outstanding = true },
531
+ {Str2 , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
532
+ {Str , Actions } = actions (CTag , Msgs , Str2 ),
533
+ {State # stream_client {readers = maps :update (CTag , Str , Readers )},
534
+ Actions };
540
535
_ ->
541
- {State0 , []}
536
+ {State , []}
542
537
end .
543
538
544
- % % Returns only an approximation.
545
- available_messages (# stream {log = Log ,
546
- last_consumed_offset = LastConsumedOffset }) ->
547
- max (0 , osiris_log :committed_offset (Log ) - LastConsumedOffset ).
548
-
549
539
deliver (QSs , Msg , Options ) ->
550
540
lists :foldl (
551
541
fun ({Q , stateless }, {Qs , Actions }) ->
@@ -624,17 +614,34 @@ handle_event(_QName, {osiris_written, From, _WriterId, Corrs},
624
614
slow = Slow },
625
615
{ok , State , Actions };
626
616
handle_event (QName , {osiris_offset , _From , _Offs },
627
- State = # stream_client {local_pid = LocalPid ,
628
- readers = Readers0 ,
629
- name = Name }) ->
617
+ State0 = # stream_client {local_pid = LocalPid ,
618
+ readers = Readers0 ,
619
+ name = Name }) ->
630
620
% % offset isn't actually needed as we use the atomic to read the
631
621
% % current committed
632
622
{Readers , Actions } = maps :fold (
633
623
fun (Tag , Str0 , {Rds , As }) ->
634
- {Str , Msgs } = stream_entries (QName , Name , LocalPid , Str0 ),
635
- {Rds #{Tag => Str }, deliver_actions (Tag , Str # stream .ack , Msgs ) ++ As }
636
- end , {#{}, []}, Readers0 ),
637
- {ok , State # stream_client {readers = Readers }, Actions };
624
+ {Str1 , Msgs } = stream_entries (QName , Name , Tag , LocalPid , Str0 ),
625
+ {Str , As1 } = actions (Tag , Msgs , Str1 ),
626
+ {[{Tag , Str } | Rds ], As1 ++ As }
627
+ end , {[], []}, Readers0 ),
628
+ State = State0 # stream_client {readers = maps :from_list (Readers )},
629
+ {ok , State , Actions };
630
+ handle_event (QName , {resume_filtering , CTag },
631
+ # stream_client {name = Name ,
632
+ local_pid = LocalPid ,
633
+ readers = Readers0 } = State ) ->
634
+ case Readers0 of
635
+ #{CTag := Str0 } ->
636
+ Str1 = Str0 # stream {unmatched = 0 ,
637
+ filtering_paused = false },
638
+ {Str2 , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
639
+ {Str , Actions } = actions (CTag , Msgs , Str2 ),
640
+ Readers = maps :update (CTag , Str , Readers0 ),
641
+ {ok , State # stream_client {readers = Readers }, Actions };
642
+ _ ->
643
+ {ok , State , []}
644
+ end ;
638
645
handle_event (_QName , {stream_leader_change , Pid }, State ) ->
639
646
{ok , update_leader_pid (Pid , State ), []};
640
647
handle_event (_QName , {stream_local_member_change , Pid },
@@ -690,7 +697,7 @@ settle(QName, _, CTag, MsgIds, #stream_client{readers = Readers0,
690
697
% % all settle reasons will "give credit" to the stream queue
691
698
Credit = length (MsgIds ),
692
699
Str1 = Str0 # stream {credit = Credit0 + Credit },
693
- {Str , Msgs } = stream_entries (QName , Name , LocalPid , Str1 ),
700
+ {Str , Msgs } = stream_entries (QName , Name , CTag , LocalPid , Str1 ),
694
701
Readers = maps :update (CTag , Str , Readers0 ),
695
702
{State # stream_client {readers = Readers },
696
703
deliver_actions (CTag , Ack , Msgs )};
@@ -1132,7 +1139,10 @@ add_if_defined(Key, Value, Map) ->
1132
1139
maps :put (Key , Value , Map ).
1133
1140
1134
1141
format_osiris_event (Evt , QRef ) ->
1135
- {'$gen_cast' , {queue_event , QRef , Evt }}.
1142
+ {'$gen_cast' , queue_event (QRef , Evt )}.
1143
+
1144
+ queue_event (QRef , Evt ) ->
1145
+ {queue_event , QRef , Evt }.
1136
1146
1137
1147
max_age (undefined ) ->
1138
1148
undefined ;
@@ -1159,21 +1169,21 @@ recover(Q) ->
1159
1169
maybe_send_reply (_ChPid , undefined ) -> ok ;
1160
1170
maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
1161
1171
1162
- stream_entries (QName , Name , LocalPid ,
1172
+ stream_entries (QName , Name , CTag , LocalPid ,
1163
1173
# stream {chunk_iterator = undefined ,
1164
1174
credit = Credit } = Str0 ) ->
1165
1175
case Credit > 0 of
1166
1176
true ->
1167
1177
case chunk_iterator (Str0 , LocalPid ) of
1168
1178
{ok , Str } ->
1169
- stream_entries (QName , Name , LocalPid , Str );
1179
+ stream_entries (QName , Name , CTag , LocalPid , Str );
1170
1180
{end_of_stream , Str } ->
1171
1181
{Str , []}
1172
1182
end ;
1173
1183
false ->
1174
1184
{Str0 , []}
1175
1185
end ;
1176
- stream_entries (QName , Name , LocalPid ,
1186
+ stream_entries (QName , Name , CTag , LocalPid ,
1177
1187
# stream {delivery_count = DC ,
1178
1188
credit = Credit ,
1179
1189
buffer_msgs_rev = Buf0 ,
@@ -1194,40 +1204,49 @@ stream_entries(QName, Name, LocalPid,
1194
1204
credit = Credit - BufLen ,
1195
1205
buffer_msgs_rev = [],
1196
1206
last_consumed_offset = LastOff + BufLen },
1197
- stream_entries (QName , Name , LocalPid , Str , Buf0 )
1207
+ stream_entries (QName , Name , CTag , LocalPid , Str , Buf0 )
1198
1208
end ;
1199
- stream_entries (QName , Name , LocalPid , Str ) ->
1200
- stream_entries (QName , Name , LocalPid , Str , []).
1209
+ stream_entries (QName , Name , CTag , LocalPid , Str ) ->
1210
+ stream_entries (QName , Name , CTag , LocalPid , Str , []).
1201
1211
1202
- stream_entries (_ , _ , _ , # stream {credit = Credit } = Str , Acc )
1212
+ stream_entries (_ , _ , _ , _ , # stream {credit = Credit } = Str , Acc )
1203
1213
when Credit < 1 ->
1204
1214
{Str , lists :reverse (Acc )};
1205
- stream_entries (QName , Name , LocalPid ,
1215
+ stream_entries (QName , Name , CTag , LocalPid ,
1206
1216
# stream {chunk_iterator = Iter0 ,
1207
1217
delivery_count = DC ,
1208
1218
credit = Credit ,
1209
1219
start_offset = StartOffset ,
1210
- filter = Filter } = Str0 , Acc0 ) ->
1220
+ filter = Filter ,
1221
+ unmatched = Unmatched } = Str0 , Acc0 ) ->
1211
1222
case osiris_log :iterator_next (Iter0 ) of
1223
+ end_of_chunk when Unmatched > ? UNMATCHED_THRESHOLD ->
1224
+ % % Pause filtering temporariliy for two reasons:
1225
+ % % 1. Process Erlang messages in our mailbox to avoid blocking other links
1226
+ % % 2. Send matched messages to the receiver as soon as possible
1227
+ gen_server :cast (self (), queue_event (QName , {resume_filtering , CTag })),
1228
+ {Str0 # stream {filtering_paused = true }, lists :reverse (Acc0 )};
1212
1229
end_of_chunk ->
1213
1230
case chunk_iterator (Str0 , LocalPid ) of
1214
1231
{ok , Str } ->
1215
- stream_entries (QName , Name , LocalPid , Str , Acc0 );
1232
+ stream_entries (QName , Name , CTag , LocalPid , Str , Acc0 );
1216
1233
{end_of_stream , Str } ->
1217
1234
{Str , lists :reverse (Acc0 )}
1218
1235
end ;
1219
1236
{{Offset , Entry }, Iter } ->
1220
1237
{Str , Acc } = case Entry of
1221
1238
{batch , _NumRecords , 0 , _Len , BatchedEntries } ->
1222
- {MsgsRev , NumMsgs } = parse_uncompressed_subbatch (
1223
- BatchedEntries , Offset , StartOffset ,
1224
- QName , Name , LocalPid , Filter , {[], 0 }),
1239
+ {MsgsRev , NumMsgs , U } = parse_uncompressed_subbatch (
1240
+ BatchedEntries , Offset , StartOffset ,
1241
+ QName , Name , LocalPid , Filter ,
1242
+ {[], 0 , Unmatched }),
1225
1243
case Credit >= NumMsgs of
1226
1244
true ->
1227
1245
{Str0 # stream {chunk_iterator = Iter ,
1228
1246
delivery_count = delivery_count_add (DC , NumMsgs ),
1229
1247
credit = Credit - NumMsgs ,
1230
- last_consumed_offset = Offset + NumMsgs - 1 },
1248
+ last_consumed_offset = Offset + NumMsgs - 1 ,
1249
+ unmatched = U },
1231
1250
MsgsRev ++ Acc0 };
1232
1251
false ->
1233
1252
% % Consumer doesn't have sufficient credit.
@@ -1238,7 +1257,8 @@ stream_entries(QName, Name, LocalPid,
1238
1257
delivery_count = delivery_count_add (DC , Credit ),
1239
1258
credit = 0 ,
1240
1259
buffer_msgs_rev = Buf ,
1241
- last_consumed_offset = Offset + Credit - 1 },
1260
+ last_consumed_offset = Offset + Credit - 1 ,
1261
+ unmatched = U },
1242
1262
MsgsRev1 ++ Acc0 }
1243
1263
end ;
1244
1264
{batch , _ , _CompressionType , _ , _ } ->
@@ -1252,20 +1272,22 @@ stream_entries(QName, Name, LocalPid,
1252
1272
Name , LocalPid , Filter ) of
1253
1273
none ->
1254
1274
{Str0 # stream {chunk_iterator = Iter ,
1255
- last_consumed_offset = Offset },
1275
+ last_consumed_offset = Offset ,
1276
+ unmatched = Unmatched + 1 },
1256
1277
Acc0 };
1257
1278
Msg ->
1258
1279
{Str0 # stream {chunk_iterator = Iter ,
1259
1280
delivery_count = delivery_count_add (DC , 1 ),
1260
1281
credit = Credit - 1 ,
1261
- last_consumed_offset = Offset },
1282
+ last_consumed_offset = Offset ,
1283
+ unmatched = 0 },
1262
1284
[Msg | Acc0 ]}
1263
1285
end ;
1264
1286
false ->
1265
1287
{Str0 # stream {chunk_iterator = Iter }, Acc0 }
1266
1288
end
1267
1289
end ,
1268
- stream_entries (QName , Name , LocalPid , Str , Acc )
1290
+ stream_entries (QName , Name , CTag , LocalPid , Str , Acc )
1269
1291
end .
1270
1292
1271
1293
chunk_iterator (# stream {credit = Credit ,
@@ -1300,14 +1322,14 @@ parse_uncompressed_subbatch(
1300
1322
Len :31 /unsigned ,
1301
1323
Entry :Len /binary ,
1302
1324
Rem /binary >>,
1303
- Offset , StartOffset , QName , Name , LocalPid , Filter , Acc0 = {AccList , AccCount }) ->
1325
+ Offset , StartOffset , QName , Name , LocalPid , Filter , Acc0 = {AccList , AccCount , Unmatched }) ->
1304
1326
Acc = case Offset >= StartOffset of
1305
1327
true ->
1306
1328
case entry_to_msg (Entry , Offset , QName , Name , LocalPid , Filter ) of
1307
1329
none ->
1308
- Acc0 ;
1330
+ setelement ( 3 , Acc0 , Unmatched + 1 ) ;
1309
1331
Msg ->
1310
- {[Msg | AccList ], AccCount + 1 }
1332
+ {[Msg | AccList ], AccCount + 1 , 0 }
1311
1333
end ;
1312
1334
false ->
1313
1335
Acc0
@@ -1418,6 +1440,37 @@ is_minority(All, Up) ->
1418
1440
MinQuorum = length (All ) div 2 + 1 ,
1419
1441
length (Up ) < MinQuorum .
1420
1442
1443
+ actions (CTag , Msgs , # stream {ack = Ack } = Str0 ) ->
1444
+ Str1 = maybe_drain (Str0 ),
1445
+ {Str , Actions } = credit_reply (CTag , Str1 ),
1446
+ {Str , deliver_actions (CTag , Ack , Msgs ) ++ Actions }.
1447
+
1448
+ maybe_drain (# stream {delivery_count = DeliveryCount ,
1449
+ credit = Credit ,
1450
+ drain = true ,
1451
+ filtering_paused = false } = Str )
1452
+ when Credit > 0 ->
1453
+ Str # stream {delivery_count = serial_number :add (DeliveryCount , Credit ),
1454
+ credit = 0 };
1455
+ maybe_drain (Str ) ->
1456
+ Str .
1457
+
1458
+ credit_reply (CTag , # stream {delivery_count = DeliveryCount ,
1459
+ credit = Credit ,
1460
+ drain = Drain ,
1461
+ credit_reply_outstanding = true ,
1462
+ filtering_paused = false } = Str ) ->
1463
+ {Str # stream {credit_reply_outstanding = false },
1464
+ [{credit_reply , CTag , DeliveryCount , Credit ,
1465
+ available_messages (Str ), Drain }]};
1466
+ credit_reply (_ , Str ) ->
1467
+ {Str , []}.
1468
+
1469
+ % % Returns only an approximation.
1470
+ available_messages (# stream {log = Log ,
1471
+ last_consumed_offset = LastConsumedOffset }) ->
1472
+ max (0 , osiris_log :committed_offset (Log ) - LastConsumedOffset ).
1473
+
1421
1474
deliver_actions (_ , _ , []) ->
1422
1475
[];
1423
1476
deliver_actions (CTag , Ack , Msgs ) ->
0 commit comments