@@ -94,7 +94,10 @@ class FlightTransport extends TcpTransport {
94
94
private final ExecutorService clientExecutor ;
95
95
96
96
private final ThreadPool threadPool ;
97
- private BufferAllocator allocator ;
97
+ private RootAllocator rootAllocator ;
98
+ private BufferAllocator serverAllocator ;
99
+ private BufferAllocator clientAllocator ;
100
+
98
101
private final NamedWriteableRegistry namedWriteableRegistry ;
99
102
private final FlightStatsCollector statsCollector ;
100
103
private final FlightTransportConfig config = new FlightTransportConfig ();
@@ -136,12 +139,14 @@ public FlightTransport(
136
139
protected void doStart () {
137
140
boolean success = false ;
138
141
try {
139
- allocator = AccessController .doPrivileged ((PrivilegedAction <BufferAllocator >) () -> new RootAllocator (Integer .MAX_VALUE ));
142
+ rootAllocator = AccessController .doPrivileged ((PrivilegedAction <RootAllocator >) () -> new RootAllocator (Integer .MAX_VALUE ));
143
+ serverAllocator = rootAllocator .newChildAllocator ("server" , 0 , rootAllocator .getLimit ());
144
+ clientAllocator = rootAllocator .newChildAllocator ("client" , 0 , rootAllocator .getLimit ());
140
145
if (statsCollector != null ) {
141
- statsCollector .setBufferAllocator (allocator );
146
+ statsCollector .setBufferAllocator (rootAllocator );
142
147
statsCollector .setThreadPool (threadPool );
143
148
}
144
- flightProducer = new ArrowFlightProducer (this , allocator , SERVER_HEADER_KEY , statsCollector );
149
+ flightProducer = new ArrowFlightProducer (this , rootAllocator , SERVER_HEADER_KEY , statsCollector );
145
150
bindServer ();
146
151
success = true ;
147
152
if (statsCollector != null ) {
@@ -215,7 +220,7 @@ private List<InetSocketAddress> bindToPort(InetAddress[] hostAddresses) {
215
220
// Create single FlightServer with all locations
216
221
ServerHeaderMiddleware .Factory factory = new ServerHeaderMiddleware .Factory ();
217
222
OSFlightServer .Builder builder = OSFlightServer .builder ()
218
- .allocator (allocator . newChildAllocator ( "server" , 0 , Long . MAX_VALUE ) )
223
+ .allocator (serverAllocator )
219
224
.producer (flightProducer )
220
225
.sslContext (sslContextProvider != null ? sslContextProvider .getServerSslContext () : null )
221
226
.channelType (ServerConfig .serverChannelType ())
@@ -256,11 +261,13 @@ protected void stopInternal() {
256
261
flightServer .close ();
257
262
flightServer = null ;
258
263
}
264
+ serverAllocator .close ();
259
265
for (ClientHolder holder : flightClients .values ()) {
260
266
holder .flightClient ().close ();
261
267
}
262
- allocator .close ();
263
268
flightClients .clear ();
269
+ clientAllocator .close ();
270
+ rootAllocator .close ();
264
271
gracefullyShutdownELG (bossEventLoopGroup , "os-grpc-boss-ELG" );
265
272
gracefullyShutdownELG (workerEventLoopGroup , "os-grpc-worker-ELG" );
266
273
if (statsCollector != null ) {
@@ -297,7 +304,7 @@ protected TcpChannel initiateChannel(DiscoveryNode node) throws IOException {
297
304
ClientHeaderMiddleware .Factory factory = new ClientHeaderMiddleware .Factory (context , getVersion ());
298
305
FlightClient client = OSFlightClient .builder ()
299
306
// TODO configure initial and max reservation setting per client
300
- .allocator (allocator . newChildAllocator ( "client-" + nodeId , 0 , Long . MAX_VALUE ) )
307
+ .allocator (clientAllocator )
301
308
.location (location )
302
309
.channelType (ServerConfig .clientChannelType ())
303
310
.eventLoopGroup (workerEventLoopGroup )
@@ -307,7 +314,6 @@ protected TcpChannel initiateChannel(DiscoveryNode node) throws IOException {
307
314
.build ();
308
315
return new ClientHolder (location , client , context );
309
316
});
310
-
311
317
FlightClientChannel channel = new FlightClientChannel (
312
318
boundAddress ,
313
319
holder .flightClient (),
0 commit comments