Skip to content

Commit 44a9284

Browse files
authored
Correct config value parsing for decrease value to use double (#2893)
1 parent fe3474b commit 44a9284

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/ThrottlingInvoker.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ public class ThrottlingInvoker
4848
private static final Logger logger = LoggerFactory.getLogger(ThrottlingInvoker.class);
4949

5050
//Controls the delay applied between calls at the initial occurrence of Congestion.
51-
private static final String THROTTLE_INITIAL_DELAY_MS = "throttle_initial_delay_ms";
51+
public static final String THROTTLE_INITIAL_DELAY_MS = "throttle_initial_delay_ms";
5252
//The max milliseconds to wait between calls in periods of high congestion.
53-
private static final String THROTTLE_MAX_DELAY_MS = "throttle_max_delay_ms";
53+
public static final String THROTTLE_MAX_DELAY_MS = "throttle_max_delay_ms";
5454
//The multiplicative factor by which we should decrease our call rate (e.g. increase delay) when congestion occurs.
55-
private static final String THROTTLE_DECREASE_FACTOR = "throttle_decrease_factor";
55+
public static final String THROTTLE_DECREASE_FACTOR = "throttle_decrease_factor";
5656
//The additive factor by which we should increase our call rate (e.g. decrease delay) when we seem free of congestion.
57-
private static final String THROTTLE_INCREASE_MS = "throttle_increase_ms";
57+
public static final String THROTTLE_INCREASE_MS = "throttle_increase_ms";
5858

5959
//10ms is our initial delay, this takes us from unlimited TPS to 100 TPS as a first step.
6060
private static final long DEFAULT_INITIAL_DELAY_MS = 10;
@@ -143,7 +143,7 @@ public static Builder newDefaultBuilder(ExceptionFilter filter, java.util.Map<St
143143
long maxDelayMs = (configOptions.get(THROTTLE_MAX_DELAY_MS) != null) ?
144144
Long.parseLong(configOptions.get(THROTTLE_MAX_DELAY_MS)) : DEFAULT_MAX_DELAY_MS;
145145
double decreaseFactor = (configOptions.get(THROTTLE_DECREASE_FACTOR) != null) ?
146-
Long.parseLong(configOptions.get(THROTTLE_DECREASE_FACTOR)) : DEFAULT_DECREASE_FACTOR;
146+
Double.parseDouble(configOptions.get(THROTTLE_DECREASE_FACTOR)) : DEFAULT_DECREASE_FACTOR;
147147
long increase = (configOptions.get(THROTTLE_INCREASE_MS) != null) ?
148148
Long.parseLong(configOptions.get(THROTTLE_INCREASE_MS)) : DEFAULT_INCREASE_MS;
149149

@@ -227,6 +227,26 @@ long getDelay()
227227
return delay.get();
228228
}
229229

230+
public double getDecrease()
231+
{
232+
return decrease;
233+
}
234+
235+
public long getDefaultInitialDelayMs()
236+
{
237+
return initialDelayMs;
238+
}
239+
240+
public long getMaxDelayMs()
241+
{
242+
return maxDelayMs;
243+
}
244+
245+
public long getIncrease()
246+
{
247+
return increase;
248+
}
249+
230250
@Override
231251
public String toString()
232252
{

athena-federation-sdk/src/test/java/com/amazonaws/athena/connector/lambda/ThrottlingInvokerTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@
2626
import software.amazon.awssdk.services.glue.model.ErrorDetails;
2727
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;
2828

29+
import java.util.Map;
2930
import java.util.concurrent.TimeoutException;
3031
import java.util.concurrent.atomic.AtomicLong;
3132

33+
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_DECREASE_FACTOR;
34+
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_INCREASE_MS;
35+
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_INITIAL_DELAY_MS;
36+
import static com.amazonaws.athena.connector.lambda.ThrottlingInvoker.THROTTLE_MAX_DELAY_MS;
3237
import static org.junit.Assert.assertEquals;
3338
import static org.junit.Assert.assertTrue;
3439
import static org.mockito.Mockito.mock;
@@ -122,4 +127,21 @@ public void invokeWithThrottleNoSpill()
122127
when(spiller.spilled()).thenReturn(false);
123128
invoker.invoke(() -> {throw new AthenaConnectorException("Throttling error", ErrorDetails.builder().errorCode(FederationSourceErrorCode.THROTTLING_EXCEPTION.toString()).build());}, 2_000);
124129
}
130+
131+
@Test
132+
public void testDefaultBuilderFromConfig()
133+
{
134+
java.util.Map<String, String> configOption = Map.of(THROTTLE_INITIAL_DELAY_MS, "100",
135+
THROTTLE_MAX_DELAY_MS, "10000",
136+
THROTTLE_DECREASE_FACTOR, "0.1",
137+
THROTTLE_INCREASE_MS, "500") ;
138+
ThrottlingInvoker.Builder builder = ThrottlingInvoker.newDefaultBuilder((Exception ex) -> ex instanceof FederationThrottleException, configOption);
139+
140+
ThrottlingInvoker invoker = builder.build();
141+
142+
assertTrue(invoker.getDecrease() == 0.1);
143+
assertEquals(invoker.getIncrease(), 500);
144+
assertEquals(invoker.getDefaultInitialDelayMs(), 100);
145+
assertEquals(invoker.getMaxDelayMs(), 10000);
146+
}
125147
}

0 commit comments

Comments
 (0)