Skip to content

Commit 39663fa

Browse files
avifeneshUbuntu
authored andcommitted
fix(java): enforce immediate timeouts (valkey-io#5264)
--------- Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> Signed-off-by: Shoham Elias <shohame@amazon.com>
1 parent a82bee5 commit 39663fa

File tree

14 files changed

+865
-190
lines changed

14 files changed

+865
-190
lines changed

java/client/src/main/java/glide/api/BaseClient.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,9 @@ protected static CommandManager buildCommandManager(ConnectionManager connection
500500
// We'll update this once the connection provides the native handle
501501
GlideCoreClient core =
502502
new GlideCoreClient(
503-
connectionManager.getNativeClientHandle(), connectionManager.getMaxInflightRequests());
503+
connectionManager.getNativeClientHandle(),
504+
connectionManager.getMaxInflightRequests(),
505+
connectionManager.getRequestTimeoutMs());
504506
// Register for PubSub push delivery
505507
try {
506508
GlideCoreClient.registerClient(connectionManager.getNativeClientHandle(), null);
@@ -2667,13 +2669,14 @@ public CompletableFuture<Map<GlideString, Double>> zpopmin(@NonNull GlideString
26672669
@Override
26682670
public CompletableFuture<Object[]> bzpopmin(@NonNull String[] keys, double timeout) {
26692671
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
2670-
return commandManager.submitNewCommand(BZPopMin, arguments, this::handleArrayOrNullResponse);
2672+
return commandManager.submitBlockingCommand(
2673+
BZPopMin, arguments, this::handleArrayOrNullResponse);
26712674
}
26722675

26732676
@Override
26742677
public CompletableFuture<Object[]> bzpopmin(@NonNull GlideString[] keys, double timeout) {
26752678
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
2676-
return commandManager.submitNewCommand(
2679+
return commandManager.submitBlockingCommand(
26772680
BZPopMin, arguments, this::handleArrayOrNullResponseBinary);
26782681
}
26792682

@@ -2705,13 +2708,14 @@ public CompletableFuture<Map<GlideString, Double>> zpopmax(@NonNull GlideString
27052708
@Override
27062709
public CompletableFuture<Object[]> bzpopmax(@NonNull String[] keys, double timeout) {
27072710
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
2708-
return commandManager.submitNewCommand(BZPopMax, arguments, this::handleArrayOrNullResponse);
2711+
return commandManager.submitBlockingCommand(
2712+
BZPopMax, arguments, this::handleArrayOrNullResponse);
27092713
}
27102714

27112715
@Override
27122716
public CompletableFuture<Object[]> bzpopmax(@NonNull GlideString[] keys, double timeout) {
27132717
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
2714-
return commandManager.submitNewCommand(
2718+
return commandManager.submitBlockingCommand(
27152719
BZPopMax, arguments, this::handleArrayOrNullResponseBinary);
27162720
}
27172721

@@ -3338,13 +3342,20 @@ public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xr
33383342
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
33393343
@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
33403344
String[] arguments = options.toArgs(keysAndIds);
3345+
if (options.isBlocking()) {
3346+
return commandManager.submitBlockingCommand(XRead, arguments, this::handleXReadResponse);
3347+
}
33413348
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse);
33423349
}
33433350

33443351
@Override
33453352
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadBinary(
33463353
@NonNull Map<GlideString, GlideString> keysAndIds, @NonNull StreamReadOptions options) {
33473354
GlideString[] arguments = options.toArgsBinary(keysAndIds);
3355+
if (options.isBlocking()) {
3356+
return commandManager.submitBlockingCommand(
3357+
XRead, arguments, this::handleXReadResponseBinary);
3358+
}
33483359
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponseBinary);
33493360
}
33503361

@@ -3614,6 +3625,9 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
36143625
@NonNull String consumer,
36153626
@NonNull StreamReadGroupOptions options) {
36163627
String[] arguments = options.toArgs(group, consumer, keysAndIds);
3628+
if (options.isBlocking()) {
3629+
return commandManager.submitBlockingCommand(XReadGroup, arguments, this::handleXReadResponse);
3630+
}
36173631
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
36183632
}
36193633

@@ -3624,6 +3638,10 @@ public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xr
36243638
@NonNull GlideString consumer,
36253639
@NonNull StreamReadGroupOptions options) {
36263640
GlideString[] arguments = options.toArgsBinary(group, consumer, keysAndIds);
3641+
if (options.isBlocking()) {
3642+
return commandManager.submitBlockingCommand(
3643+
XReadGroup, arguments, this::handleXReadResponseBinary);
3644+
}
36273645
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponseBinary);
36283646
}
36293647

@@ -4106,14 +4124,14 @@ public CompletableFuture<Long> linsert(
41064124
@Override
41074125
public CompletableFuture<String[]> blpop(@NonNull String[] keys, double timeout) {
41084126
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
4109-
return commandManager.submitNewCommand(
4127+
return commandManager.submitBlockingCommand(
41104128
BLPop, arguments, response -> castArray(handleArrayOrNullResponse(response), String.class));
41114129
}
41124130

41134131
@Override
41144132
public CompletableFuture<GlideString[]> blpop(@NonNull GlideString[] keys, double timeout) {
41154133
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
4116-
return commandManager.submitNewCommand(
4134+
return commandManager.submitBlockingCommand(
41174135
BLPop,
41184136
arguments,
41194137
response -> castArray(handleArrayOrNullResponseBinary(response), GlideString.class));
@@ -4122,14 +4140,14 @@ public CompletableFuture<GlideString[]> blpop(@NonNull GlideString[] keys, doubl
41224140
@Override
41234141
public CompletableFuture<String[]> brpop(@NonNull String[] keys, double timeout) {
41244142
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
4125-
return commandManager.submitNewCommand(
4143+
return commandManager.submitBlockingCommand(
41264144
BRPop, arguments, response -> castArray(handleArrayOrNullResponse(response), String.class));
41274145
}
41284146

41294147
@Override
41304148
public CompletableFuture<GlideString[]> brpop(@NonNull GlideString[] keys, double timeout) {
41314149
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
4132-
return commandManager.submitNewCommand(
4150+
return commandManager.submitBlockingCommand(
41334151
BRPop,
41344152
arguments,
41354153
response -> castArray(handleArrayOrNullResponseBinary(response), GlideString.class));
@@ -4288,7 +4306,7 @@ public CompletableFuture<Map<String, Object>> bzmpop(
42884306
new String[] {Double.toString(timeout), Integer.toString(keys.length)},
42894307
keys,
42904308
new String[] {modifier.toString()});
4291-
return commandManager.submitNewCommand(
4309+
return commandManager.submitBlockingCommand(
42924310
BZMPop,
42934311
arguments,
42944312
response -> convertKeyValueArrayToMap(handleArrayOrNullResponse(response), Double.class));
@@ -4302,7 +4320,7 @@ public CompletableFuture<Map<GlideString, Object>> bzmpop(
43024320
new GlideString[] {gs(Double.toString(timeout)), gs(Integer.toString(keys.length))},
43034321
keys,
43044322
new GlideString[] {gs(modifier.toString())});
4305-
return commandManager.submitNewCommand(
4323+
return commandManager.submitBlockingCommand(
43064324
BZMPop,
43074325
arguments,
43084326
response ->
@@ -4318,7 +4336,7 @@ public CompletableFuture<Map<String, Object>> bzmpop(
43184336
new String[] {Double.toString(timeout), Integer.toString(keys.length)},
43194337
keys,
43204338
new String[] {modifier.toString(), COUNT_VALKEY_API, Long.toString(count)});
4321-
return commandManager.submitNewCommand(
4339+
return commandManager.submitBlockingCommand(
43224340
BZMPop,
43234341
arguments,
43244342
response -> convertKeyValueArrayToMap(handleArrayOrNullResponse(response), Double.class));
@@ -4334,7 +4352,7 @@ public CompletableFuture<Map<GlideString, Object>> bzmpop(
43344352
new GlideString[] {
43354353
gs(modifier.toString()), gs(COUNT_VALKEY_API), gs(Long.toString(count))
43364354
});
4337-
return commandManager.submitNewCommand(
4355+
return commandManager.submitBlockingCommand(
43384356
BZMPop,
43394357
arguments,
43404358
response ->
@@ -4732,7 +4750,7 @@ public CompletableFuture<Map<String, String[]>> blmpop(
47324750
new String[] {Double.toString(timeout), Long.toString(keys.length)},
47334751
keys,
47344752
new String[] {direction.toString(), COUNT_FOR_LIST_VALKEY_API, Long.toString(count)});
4735-
return commandManager.submitNewCommand(
4753+
return commandManager.submitBlockingCommand(
47364754
BLMPop,
47374755
arguments,
47384756
response -> castMapOfArrays(handleMapOrNullResponse(response), String.class));
@@ -4748,7 +4766,7 @@ public CompletableFuture<Map<GlideString, GlideString[]>> blmpop(
47484766
new GlideString[] {
47494767
gs(direction.toString()), gs(COUNT_FOR_LIST_VALKEY_API), gs(Long.toString(count))
47504768
});
4751-
return commandManager.submitNewCommand(
4769+
return commandManager.submitBlockingCommand(
47524770
BLMPop,
47534771
arguments,
47544772
response ->
@@ -4764,7 +4782,7 @@ public CompletableFuture<Map<String, String[]>> blmpop(
47644782
new String[] {Double.toString(timeout), Long.toString(keys.length)},
47654783
keys,
47664784
new String[] {direction.toString()});
4767-
return commandManager.submitNewCommand(
4785+
return commandManager.submitBlockingCommand(
47684786
BLMPop,
47694787
arguments,
47704788
response -> castMapOfArrays(handleMapOrNullResponse(response), String.class));
@@ -4778,7 +4796,7 @@ public CompletableFuture<Map<GlideString, GlideString[]>> blmpop(
47784796
new GlideString[] {gs(Double.toString(timeout)), gs(Long.toString(keys.length))},
47794797
keys,
47804798
new GlideString[] {gs(direction.toString())});
4781-
return commandManager.submitNewCommand(
4799+
return commandManager.submitBlockingCommand(
47824800
BLMPop,
47834801
arguments,
47844802
response ->
@@ -4832,7 +4850,8 @@ public CompletableFuture<String> blmove(
48324850
new String[] {
48334851
source, destination, wherefrom.toString(), whereto.toString(), Double.toString(timeout)
48344852
};
4835-
return commandManager.submitNewCommand(BLMove, arguments, this::handleStringOrNullResponse);
4853+
return commandManager.submitBlockingCommand(
4854+
BLMove, arguments, this::handleStringOrNullResponse);
48364855
}
48374856

48384857
@Override
@@ -4850,7 +4869,7 @@ public CompletableFuture<GlideString> blmove(
48504869
gs(whereto.toString()),
48514870
gs(Double.toString(timeout))
48524871
};
4853-
return commandManager.submitNewCommand(
4872+
return commandManager.submitBlockingCommand(
48544873
BLMove, arguments, this::handleGlideStringOrNullResponse);
48554874
}
48564875

@@ -5957,7 +5976,7 @@ public CompletableFuture<Object[]> hscan(
59575976

59585977
@Override
59595978
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
5960-
return commandManager.submitNewCommand(
5979+
return commandManager.submitBlockingCommand(
59615980
Wait,
59625981
new String[] {Long.toString(numreplicas), Long.toString(timeout)},
59635982
this::handleLongResponse);

java/client/src/main/java/glide/api/GlideClient.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static command_request.CommandRequestOuterClass.RequestType.ConfigResetStat;
88
import static command_request.CommandRequestOuterClass.RequestType.ConfigRewrite;
99
import static command_request.CommandRequestOuterClass.RequestType.ConfigSet;
10-
import static command_request.CommandRequestOuterClass.RequestType.CustomCommand;
1110
import static command_request.CommandRequestOuterClass.RequestType.DBSize;
1211
import static command_request.CommandRequestOuterClass.RequestType.Echo;
1312
import static command_request.CommandRequestOuterClass.RequestType.FlushAll;
@@ -135,13 +134,12 @@ public static CompletableFuture<GlideClient> createClient(
135134

136135
@Override
137136
public CompletableFuture<Object> customCommand(@NonNull String[] args) {
138-
return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectOrNullResponse);
137+
return commandManager.submitCustomCommand(args, this::handleObjectOrNullResponse);
139138
}
140139

141140
@Override
142141
public CompletableFuture<Object> customCommand(@NonNull GlideString[] args) {
143-
return commandManager.submitNewCommand(
144-
CustomCommand, args, this::handleBinaryObjectOrNullResponse);
142+
return commandManager.submitCustomCommand(args, this::handleBinaryObjectOrNullResponse);
145143
}
146144

147145
@Deprecated

java/client/src/main/java/glide/api/GlideClusterClient.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static command_request.CommandRequestOuterClass.RequestType.ConfigResetStat;
88
import static command_request.CommandRequestOuterClass.RequestType.ConfigRewrite;
99
import static command_request.CommandRequestOuterClass.RequestType.ConfigSet;
10-
import static command_request.CommandRequestOuterClass.RequestType.CustomCommand;
1110
import static command_request.CommandRequestOuterClass.RequestType.DBSize;
1211
import static command_request.CommandRequestOuterClass.RequestType.Echo;
1312
import static command_request.CommandRequestOuterClass.RequestType.FCall;
@@ -165,31 +164,29 @@ public static CompletableFuture<GlideClusterClient> createClient(
165164
@Override
166165
public CompletableFuture<ClusterValue<Object>> customCommand(@NonNull String[] args) {
167166
// TODO if a command returns a map as a single value, ClusterValue misleads user
168-
return commandManager.submitNewCommand(
169-
CustomCommand, args, response -> ClusterValue.of(handleObjectOrNullResponse(response)));
167+
return commandManager.submitCustomCommand(
168+
args, response -> ClusterValue.of(handleObjectOrNullResponse(response)));
170169
}
171170

172171
@Override
173172
public CompletableFuture<ClusterValue<Object>> customCommand(@NonNull GlideString[] args) {
174173
// TODO if a command returns a map as a single value, ClusterValue misleads user
175-
return commandManager.submitNewCommand(
176-
CustomCommand,
177-
args,
178-
response -> ClusterValue.of(handleBinaryObjectOrNullResponse(response)));
174+
return commandManager.submitCustomCommand(
175+
args, response -> ClusterValue.of(handleBinaryObjectOrNullResponse(response)));
179176
}
180177

181178
@Override
182179
public CompletableFuture<ClusterValue<Object>> customCommand(
183180
@NonNull String[] args, @NonNull Route route) {
184-
return commandManager.submitNewCommand(
185-
CustomCommand, args, route, response -> handleCustomCommandResponse(route, response));
181+
return commandManager.submitCustomCommand(
182+
args, route, response -> handleCustomCommandResponse(route, response));
186183
}
187184

188185
@Override
189186
public CompletableFuture<ClusterValue<Object>> customCommand(
190187
@NonNull GlideString[] args, @NonNull Route route) {
191-
return commandManager.submitNewCommand(
192-
CustomCommand, args, route, response -> handleCustomCommandBinaryResponse(route, response));
188+
return commandManager.submitCustomCommand(
189+
args, route, response -> handleCustomCommandBinaryResponse(route, response));
193190
}
194191

195192
@SuppressWarnings("unchecked")
@@ -1307,7 +1304,7 @@ public CompletableFuture<GlideString> randomKeyBinary() {
13071304
@Override
13081305
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
13091306
String[] arguments = new String[] {Long.toString(numreplicas), Long.toString(timeout)};
1310-
return commandManager.submitNewCommand(
1307+
return commandManager.submitBlockingCommand(
13111308
Wait, arguments, SimpleSingleNodeRoute.RANDOM, this::handleLongResponse);
13121309
}
13131310

java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ public class StreamReadOptions {
3636
*/
3737
protected Long count;
3838

39+
/**
40+
* Returns true if this options object specifies a BLOCK timeout, making the command a blocking
41+
* command.
42+
*
43+
* <p>Note: We check {@code block != null} rather than {@code block != null && block != 0} because
44+
* {@code BLOCK 0} means "block indefinitely" in Valkey/Redis, which is still a blocking command
45+
* that should skip Java-side timeout enforcement.
46+
*
47+
* @return true if BLOCK option is set (including BLOCK 0 for indefinite blocking)
48+
*/
49+
public boolean isBlocking() {
50+
return this.block != null;
51+
}
52+
3953
/**
4054
* Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map,
4155
* StreamReadOptions)} into a String[].

java/client/src/main/java/glide/ffi/resolvers/NativeUtils.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class NativeUtils {
2929
public static final String NATIVE_FOLDER_PATH_PREFIX = "nativeutils";
3030

3131
/** Temporary directory which will contain the dynamic library files. */
32-
private static File temporaryDir;
32+
private static volatile File temporaryDir;
3333

3434
/** Track if the Glide library has already been loaded */
3535
private static volatile boolean glideLibLoaded = false;
@@ -95,12 +95,17 @@ public static void loadLibraryFromJar(String path) throws IOException {
9595
}
9696

9797
// Prepare temporary file
98-
if (temporaryDir == null) {
99-
temporaryDir = createTempDirectory(NATIVE_FOLDER_PATH_PREFIX);
100-
temporaryDir.deleteOnExit();
98+
File localTempDir;
99+
synchronized (NativeUtils.class) {
100+
if (temporaryDir == null) {
101+
File createdDir = createTempDirectory(NATIVE_FOLDER_PATH_PREFIX);
102+
createdDir.deleteOnExit();
103+
temporaryDir = createdDir;
104+
}
105+
localTempDir = temporaryDir;
101106
}
102107

103-
File temp = new File(temporaryDir, filename);
108+
File temp = new File(localTempDir, filename);
104109

105110
try (InputStream is = NativeUtils.class.getResourceAsStream(path)) {
106111
if (is == null) {

0 commit comments

Comments
 (0)