Skip to content
Open

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package org.elasticsearch.compute.aggregation;
// begin generated imports
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.IntArray;
Expand All @@ -24,6 +25,7 @@ import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.IntBigArrayBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LocalCircuitBreaker;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -84,6 +86,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
private ObjectArray<Buffer> buffers;
private final List<Integer> channels;
private final DriverContext driverContext;
private final LocalCircuitBreaker.SingletonService localCircuitBreakerService;
private final BigArrays bigArrays;
private ObjectArray<ReducedState> reducedStates;
private final boolean isRateOverTime;
Expand All @@ -102,12 +105,17 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
) {
this.channels = channels;
this.driverContext = driverContext;
this.bigArrays = driverContext.bigArrays();
localCircuitBreakerService = new LocalCircuitBreaker.SingletonService(
driverContext.bigArrays().breakerService(),
ByteSizeValue.ofKb(8).getBytes(),
ByteSizeValue.ofKb(512).getBytes()
);
this.bigArrays = driverContext.bigArrays().withBreakerService(localCircuitBreakerService);
this.isRateOverTime = isRateOverTime;
ObjectArray<Buffer> buffers = driverContext.bigArrays().newObjectArray(256);
ObjectArray<Buffer> buffers = bigArrays.newObjectArray(256);
this.dateFactor = isDateNanos ? 1_000_000_000.0 : 1000.0;
try {
this.reducedStates = driverContext.bigArrays().newObjectArray(256);
this.reducedStates = bigArrays.newObjectArray(256);
this.buffers = buffers;
buffers = null;
} finally {
Expand Down Expand Up @@ -400,7 +408,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
buffer.close();
}
}
Releasables.close(reducedStates, buffers);
Releasables.close(reducedStates, buffers, localCircuitBreakerService);
}

private Buffer getBuffer(int groupId, int newElements, long firstTimestamp) {
Expand Down Expand Up @@ -608,7 +616,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
int positionCount = selected.getPositionCount();
try (
var rates = blockFactory.newDoubleBlockBuilder(positionCount);
var flushedStates = new LongObjectPagedHashMap<ReducedState>(positionCount, driverContext.bigArrays())
var flushedStates = new LongObjectPagedHashMap<ReducedState>(positionCount, bigArrays)
) {
for (int p = 0; p < positionCount; p++) {
int group = selected.getInt(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;

import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -173,4 +177,51 @@ public boolean assertEndRunLoop() {
activeThread = null;
return true;
}

/**
* A {@link CircuitBreakerService} that only supports a single {@link LocalCircuitBreaker}.
* Mainly intended to be used with {@link org.elasticsearch.common.util.BigArrays#withBreakerService(CircuitBreakerService)}
*/
public static class SingletonService extends CircuitBreakerService implements Releasable {

private final CircuitBreakerService delegateService;
private final long overReservedBytes;
private final long maxOverReservedBytes;
private LocalCircuitBreaker localCircuitBreaker;

public SingletonService(CircuitBreakerService delegateService, long overReservedBytes, long maxOverReservedBytes) {
this.delegateService = delegateService;
this.overReservedBytes = overReservedBytes;
this.maxOverReservedBytes = maxOverReservedBytes;
}

@Override
public CircuitBreaker getBreaker(String name) {
if (localCircuitBreaker == null) {
this.localCircuitBreaker = new LocalCircuitBreaker(
delegateService.getBreaker(name),
overReservedBytes,
maxOverReservedBytes
);
} else {
assert localCircuitBreaker.getName().equals(name) : "this service only supports a single breaker";
}
return localCircuitBreaker;
}

@Override
public AllCircuitBreakerStats stats() {
throw new UnsupportedOperationException();
}

@Override
public CircuitBreakerStats stats(String name) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
Releasables.close(localCircuitBreaker);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -93,29 +96,60 @@ private TrackingCircuitBreaker newTestBreaker(long limit) {
public void testBasic() {
TrackingCircuitBreaker breaker = newTestBreaker(120);
LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(breaker, 30, 50);
verifyBreaker(localBreaker, breaker);
localBreaker.close();
assertThat(breaker.getUsed(), equalTo(30L));
}

public void testServiceBasic() {
TrackingCircuitBreaker trackingBreaker = newTestBreaker(120);
CircuitBreakerService trackingService = new CircuitBreakerService() {
@Override
public CircuitBreaker getBreaker(String name) {
return trackingBreaker;
}

@Override
public AllCircuitBreakerStats stats() {
throw new UnsupportedOperationException();
}

@Override
public CircuitBreakerStats stats(String name) {
throw new UnsupportedOperationException();
}
};

LocalCircuitBreaker.SingletonService service = new LocalCircuitBreaker.SingletonService(trackingService, 30, 50);
LocalCircuitBreaker localBreaker = (LocalCircuitBreaker) service.getBreaker("foo");
verifyBreaker(localBreaker, trackingBreaker);
service.close();
assertThat(trackingBreaker.getUsed(), equalTo(30L));
}

private static void verifyBreaker(LocalCircuitBreaker localBreaker, TrackingCircuitBreaker trackingBreaker) {
localBreaker.addEstimateBytesAndMaybeBreak(20, "test");
assertThat(localBreaker.getReservedBytes(), equalTo(30L));
assertThat(breaker.callTimes(), equalTo(1));
assertThat(breaker.getUsed(), equalTo(50L));
assertThat(trackingBreaker.callTimes(), equalTo(1));
assertThat(trackingBreaker.getUsed(), equalTo(50L));
localBreaker.addWithoutBreaking(-5);
assertThat(breaker.getUsed(), equalTo(50L));
assertThat(trackingBreaker.getUsed(), equalTo(50L));
assertThat(localBreaker.getReservedBytes(), equalTo(35L));
localBreaker.addEstimateBytesAndMaybeBreak(25, "test");
assertThat(breaker.getUsed(), equalTo(50L));
assertThat(breaker.callTimes(), equalTo(1));
assertThat(trackingBreaker.getUsed(), equalTo(50L));
assertThat(trackingBreaker.callTimes(), equalTo(1));
assertThat(localBreaker.getReservedBytes(), equalTo(10L));
var error = expectThrows(CircuitBreakingException.class, () -> localBreaker.addEstimateBytesAndMaybeBreak(60, "test"));
assertThat(error.getBytesWanted(), equalTo(80L));
assertThat(breaker.callTimes(), equalTo(2));
assertThat(trackingBreaker.callTimes(), equalTo(2));
localBreaker.addEstimateBytesAndMaybeBreak(30, "test");
assertThat(breaker.getUsed(), equalTo(100L));
assertThat(trackingBreaker.getUsed(), equalTo(100L));
assertThat(localBreaker.getReservedBytes(), equalTo(30L));
assertThat(breaker.callTimes(), equalTo(3));
assertThat(trackingBreaker.callTimes(), equalTo(3));
localBreaker.addWithoutBreaking(-40L);
assertThat(breaker.getUsed(), equalTo(80L));
assertThat(trackingBreaker.getUsed(), equalTo(80L));
assertThat(localBreaker.getReservedBytes(), equalTo(50L));
assertThat(breaker.callTimes(), equalTo(4));
localBreaker.close();
assertThat(breaker.getUsed(), equalTo(30L));
assertThat(trackingBreaker.callTimes(), equalTo(4));
}

}