@@ -157,7 +157,7 @@ private class WriteBarrier implements CoreSubscriber<T>, Subscription, Publisher
157
157
private long demandBeforeReadyToWrite ;
158
158
159
159
/** Current state. */
160
- private State state = State .NEW ;
160
+ private volatile State state = State .NEW ;
161
161
162
162
/** The actual writeSubscriber from the HTTP server adapter. */
163
163
@ Nullable
@@ -198,7 +198,7 @@ else if (this.state == State.NEW) {
198
198
try {
199
199
result = ChannelSendOperator .this .writeFunction .apply (this );
200
200
}
201
- catch (Throwable ex ) {
201
+ catch (Throwable ex ) { // NOSONAR
202
202
this .writeCompletionBarrier .onError (ex );
203
203
return ;
204
204
}
@@ -214,8 +214,9 @@ else if (this.state == State.NEW) {
214
214
}
215
215
216
216
private Subscriber <? super T > requiredWriteSubscriber () {
217
- Assert .state (this .writeSubscriber != null , "No write subscriber" );
218
- return this .writeSubscriber ;
217
+ Subscriber <? super T > writeSubscriberToReturn = this .writeSubscriber ;
218
+ Assert .state (writeSubscriberToReturn != null , "No write subscriber" );
219
+ return writeSubscriberToReturn ;
219
220
}
220
221
221
222
@ Override
@@ -255,7 +256,7 @@ else if (this.state == State.NEW) {
255
256
try {
256
257
result = ChannelSendOperator .this .writeFunction .apply (this );
257
258
}
258
- catch (Throwable ex ) {
259
+ catch (Throwable ex ) { // NOSONAR
259
260
this .writeCompletionBarrier .onError (ex );
260
261
return ;
261
262
}
@@ -277,27 +278,28 @@ public Context currentContext() {
277
278
278
279
@ Override
279
280
public void request (long n ) {
281
+ long requests = n ;
280
282
Subscription s = this .subscription ;
281
283
if (s == null ) {
282
284
return ;
283
285
}
284
286
if (this .state == State .READY_TO_WRITE ) {
285
- s .request (n );
287
+ s .request (requests );
286
288
return ;
287
289
}
288
290
synchronized (this ) {
289
291
if (this .writeSubscriber != null ) {
290
292
if (this .state == State .EMITTING_CACHED_SIGNALS ) {
291
- this .demandBeforeReadyToWrite = n ;
293
+ this .demandBeforeReadyToWrite = requests ;
292
294
return ;
293
295
}
294
296
try {
295
297
this .state = State .EMITTING_CACHED_SIGNALS ;
296
298
if (emitCachedSignals ()) {
297
299
return ;
298
300
}
299
- n = n + this .demandBeforeReadyToWrite - 1 ;
300
- if (n == 0 ) {
301
+ requests = requests + this .demandBeforeReadyToWrite - 1 ;
302
+ if (requests == 0 ) {
301
303
return ;
302
304
}
303
305
}
@@ -306,7 +308,7 @@ public void request(long n) {
306
308
}
307
309
}
308
310
}
309
- s .request (n );
311
+ s .request (requests );
310
312
}
311
313
312
314
private boolean emitCachedSignals () {
@@ -319,10 +321,13 @@ private boolean emitCachedSignals() {
319
321
}
320
322
return true ;
321
323
}
322
- T item = this .item ;
323
- this .item = null ;
324
- if (item != null ) {
325
- requiredWriteSubscriber ().onNext (item );
324
+ T itemToUse ;
325
+ synchronized (this ) {
326
+ itemToUse = this .item ;
327
+ this .item = null ;
328
+ }
329
+ if (itemToUse != null ) {
330
+ requiredWriteSubscriber ().onNext (itemToUse );
326
331
}
327
332
if (this .completed ) {
328
333
requiredWriteSubscriber ().onComplete ();
@@ -347,9 +352,9 @@ public void cancel() {
347
352
348
353
private void releaseCachedItem () {
349
354
synchronized (this ) {
350
- Object item = this .item ;
351
- if (item instanceof DataBuffer ) {
352
- DataBufferUtils .release ((DataBuffer ) item );
355
+ Object itemToRelease = this .item ;
356
+ if (itemToRelease instanceof DataBuffer ) {
357
+ DataBufferUtils .release ((DataBuffer ) itemToRelease );
353
358
}
354
359
this .item = null ;
355
360
}
@@ -451,9 +456,9 @@ public void request(long n) {
451
456
@ Override
452
457
public void cancel () {
453
458
this .writeBarrier .cancel ();
454
- Subscription subscription = this .subscription ;
455
- if (subscription != null ) {
456
- subscription .cancel ();
459
+ Subscription subscriptionToCancel = this .subscription ;
460
+ if (subscriptionToCancel != null ) {
461
+ subscriptionToCancel .cancel ();
457
462
}
458
463
}
459
464
0 commit comments