39
39
import org .opensearch .core .action .ActionListener ;
40
40
import org .opensearch .core .common .io .stream .Writeable ;
41
41
import org .opensearch .tasks .Task ;
42
+ import org .opensearch .threadpool .ThreadPool ;
42
43
import org .opensearch .transport .TransportService ;
44
+ import org .opensearch .wlm .QueryGroupTask ;
43
45
44
46
/**
45
47
* Perform the search scroll
@@ -51,24 +53,32 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
51
53
private final ClusterService clusterService ;
52
54
private final SearchTransportService searchTransportService ;
53
55
private final SearchPhaseController searchPhaseController ;
56
+ private final ThreadPool threadPool ;
54
57
55
58
@ Inject
56
59
public TransportSearchScrollAction (
57
60
TransportService transportService ,
58
61
ClusterService clusterService ,
59
62
ActionFilters actionFilters ,
60
63
SearchTransportService searchTransportService ,
61
- SearchPhaseController searchPhaseController
64
+ SearchPhaseController searchPhaseController ,
65
+ ThreadPool threadPool
62
66
) {
63
67
super (SearchScrollAction .NAME , transportService , actionFilters , (Writeable .Reader <SearchScrollRequest >) SearchScrollRequest ::new );
64
68
this .clusterService = clusterService ;
65
69
this .searchTransportService = searchTransportService ;
66
70
this .searchPhaseController = searchPhaseController ;
71
+ this .threadPool = threadPool ;
67
72
}
68
73
69
74
@ Override
70
75
protected void doExecute (Task task , SearchScrollRequest request , ActionListener <SearchResponse > listener ) {
71
76
try {
77
+
78
+ if (task instanceof QueryGroupTask ) {
79
+ ((QueryGroupTask ) task ).setQueryGroupId (threadPool .getThreadContext ());
80
+ }
81
+
72
82
ParsedScrollId scrollId = TransportSearchHelper .parseScrollId (request .scrollId ());
73
83
Runnable action ;
74
84
switch (scrollId .getType ()) {
0 commit comments