Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public class ThrottlingInvoker
private static final Logger logger = LoggerFactory.getLogger(ThrottlingInvoker.class);

//Controls the delay applied between calls at the initial occurrence of Congestion.
private static final String THROTTLE_INITIAL_DELAY_MS = "throttle_initial_delay_ms";
public static final String THROTTLE_INITIAL_DELAY_MS = "throttle_initial_delay_ms";
//The max milliseconds to wait between calls in periods of high congestion.
private static final String THROTTLE_MAX_DELAY_MS = "throttle_max_delay_ms";
public static final String THROTTLE_MAX_DELAY_MS = "throttle_max_delay_ms";
//The multiplicative factor by which we should decrease our call rate (e.g. increase delay) when congestion occurs.
private static final String THROTTLE_DECREASE_FACTOR = "throttle_decrease_factor";
public static final String THROTTLE_DECREASE_FACTOR = "throttle_decrease_factor";
//The additive factor by which we should increase our call rate (e.g. decrease delay) when we seem free of congestion.
private static final String THROTTLE_INCREASE_MS = "throttle_increase_ms";
public static final String THROTTLE_INCREASE_MS = "throttle_increase_ms";

//10ms is our initial delay, this takes us from unlimited TPS to 100 TPS as a first step.
private static final long DEFAULT_INITIAL_DELAY_MS = 10;
Expand Down Expand Up @@ -143,7 +143,7 @@ public static Builder newDefaultBuilder(ExceptionFilter filter, java.util.Map<St
long maxDelayMs = (configOptions.get(THROTTLE_MAX_DELAY_MS) != null) ?
Long.parseLong(configOptions.get(THROTTLE_MAX_DELAY_MS)) : DEFAULT_MAX_DELAY_MS;
double decreaseFactor = (configOptions.get(THROTTLE_DECREASE_FACTOR) != null) ?
Long.parseLong(configOptions.get(THROTTLE_DECREASE_FACTOR)) : DEFAULT_DECREASE_FACTOR;
Double.parseDouble(configOptions.get(THROTTLE_DECREASE_FACTOR)) : DEFAULT_DECREASE_FACTOR;
long increase = (configOptions.get(THROTTLE_INCREASE_MS) != null) ?
Long.parseLong(configOptions.get(THROTTLE_INCREASE_MS)) : DEFAULT_INCREASE_MS;

Expand Down Expand Up @@ -227,6 +227,26 @@ long getDelay()
return delay.get();
}

public double getDecrease()
{
return decrease;
}

public long getDefaultInitialDelayMs()
{
return initialDelayMs;
}

public long getMaxDelayMs()
{
return maxDelayMs;
}

public long getIncrease()
{
return increase;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_DECREASE_FACTOR;
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_INCREASE_MS;
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_INITIAL_DELAY_MS;
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_MAX_DELAY_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -122,4 +127,21 @@ public void invokeWithThrottleNoSpill()
when(spiller.spilled()).thenReturn(false);
invoker.invoke(() -> {throw new AthenaConnectorException("Throttling error", ErrorDetails.builder().errorCode(FederationSourceErrorCode.THROTTLING_EXCEPTION.toString()).build());}, 2_000);
}

@Test
public void testDefaultBuilderFromConfig()
{
java.util.Map<String, String> configOption = Map.of(THROTTLE_INITIAL_DELAY_MS, "100",
THROTTLE_MAX_DELAY_MS, "10000",
THROTTLE_DECREASE_FACTOR, "0.1",
THROTTLE_INCREASE_MS, "500") ;
ThrottlingInvoker.Builder builder = ThrottlingInvoker.newDefaultBuilder((Exception ex) -> ex instanceof FederationThrottleException, configOption);

ThrottlingInvoker invoker = builder.build();

assertTrue(invoker.getDecrease() == 0.1);
assertEquals(invoker.getIncrease(), 500);
assertEquals(invoker.getDefaultInitialDelayMs(), 100);
assertEquals(invoker.getMaxDelayMs(), 10000);
}
}