Skip to content

Commit 7eabdf7

Browse files
authored
Merge pull request #5173 from hfu5/sideband
Add SidebandMultiThreadClientTest (to release6.3)
2 parents 871b098 + 71297ee commit 7eabdf7

File tree

2 files changed

+144
-0
lines changed

2 files changed

+144
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package com.apple.foundationdb;
2+
3+
import com.apple.foundationdb.tuple.Tuple;
4+
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.LinkedBlockingQueue;
10+
import java.util.concurrent.ThreadLocalRandom;
11+
12+
import org.junit.jupiter.api.Assertions;
13+
14+
/**
15+
* Each cluster has a queue, producer writes a key and then send a message to this queue in JVM.
16+
* Consumer would consume the key by checking the existence of the key, if it does not find the key,
17+
* then the test would fail.
18+
*
19+
* This test is to verify the causal consistency of transactions for mutli-threaded client.
20+
*/
21+
public class SidebandMultiThreadClientTest {
22+
public static final MultiClientHelper clientHelper = new MultiClientHelper();
23+
24+
private static final Map<Database, BlockingQueue<String>> db2Queues = new HashMap<>();
25+
private static final int threadPerDB = 5;
26+
private static final int txnCnt = 1000;
27+
28+
public static void main(String[] args) throws Exception {
29+
FDB fdb = FDB.selectAPIVersion(630);
30+
setupThreads(fdb);
31+
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
32+
for (Database db : dbs) {
33+
db2Queues.put(db, new LinkedBlockingQueue<>());
34+
}
35+
System.out.println("Start processing and validating");
36+
process(dbs);
37+
check(dbs);
38+
System.out.println("Test finished");
39+
}
40+
41+
private static synchronized void setupThreads(FDB fdb) {
42+
int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length;
43+
fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion);
44+
System.out.printf("thread per version is %d\n", clientThreadsPerVersion);
45+
fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib");
46+
fdb.options().setTraceEnable("/tmp");
47+
fdb.options().setKnob("min_trace_severity=5");
48+
}
49+
50+
private static void process(Collection<Database> dbs) {
51+
for (Database db : dbs) {
52+
for (int i = 0; i < threadPerDB; i++) {
53+
final Thread thread = new Thread(Producer.create(db, db2Queues.get(db)));
54+
thread.start();
55+
}
56+
}
57+
}
58+
59+
private static void check(Collection<Database> dbs) throws InterruptedException {
60+
final Map<Thread, Consumer> threads2Consumers = new HashMap<>();
61+
for (Database db : dbs) {
62+
for (int i = 0; i < threadPerDB; i++) {
63+
final Consumer consumer = Consumer.create(db, db2Queues.get(db));
64+
final Thread thread = new Thread(consumer);
65+
thread.start();
66+
threads2Consumers.put(thread, consumer);
67+
}
68+
}
69+
70+
for (Map.Entry<Thread, Consumer> entry : threads2Consumers.entrySet()) {
71+
entry.getKey().join();
72+
final boolean succeed = entry.getValue().succeed;
73+
Assertions.assertTrue(succeed, "Sideband test failed");
74+
}
75+
}
76+
77+
public static class Producer implements Runnable {
78+
private final Database db;
79+
private final BlockingQueue<String> queue;
80+
81+
private Producer(Database db, BlockingQueue<String> queue) {
82+
this.db = db;
83+
this.queue = queue;
84+
}
85+
86+
public static Producer create(Database db, BlockingQueue<String> queue) {
87+
return new Producer(db, queue);
88+
}
89+
90+
@Override
91+
public void run() {
92+
for (int i = 0; i < txnCnt; i++) {
93+
final long suffix = ThreadLocalRandom.current().nextLong();
94+
final String key = String.format("Sideband/Multithread/Test/%d", suffix);
95+
db.run(tr -> {
96+
tr.set(Tuple.from(key).pack(), Tuple.from("bar").pack());
97+
return null;
98+
});
99+
queue.offer(key);
100+
}
101+
}
102+
}
103+
104+
public static class Consumer implements Runnable {
105+
private final Database db;
106+
private final BlockingQueue<String> queue;
107+
private boolean succeed;
108+
109+
private Consumer(Database db, BlockingQueue<String> queue) {
110+
this.db = db;
111+
this.queue = queue;
112+
this.succeed = true;
113+
}
114+
115+
public static Consumer create(Database db, BlockingQueue<String> queue) {
116+
return new Consumer(db, queue);
117+
}
118+
119+
@Override
120+
public void run() {
121+
try {
122+
for (int i = 0; i < txnCnt && succeed; i++) {
123+
final String key = queue.take();
124+
db.run(tr -> {
125+
byte[] result = tr.get(Tuple.from(key).pack()).join();
126+
if (result == null) {
127+
System.out.println("FAILED to get key " + key + " from DB " + db);
128+
succeed = false;
129+
}
130+
if (!succeed) {
131+
return null;
132+
}
133+
String value = Tuple.fromBytes(result).getString(0);
134+
return null;
135+
});
136+
}
137+
} catch (InterruptedException e) {
138+
System.out.println("Get Exception in consumer: " + e);
139+
succeed = false;
140+
}
141+
}
142+
}
143+
}

bindings/java/src/tests.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ set(JAVA_INTEGRATION_TESTS
4949
src/integration/com/apple/foundationdb/RangeQueryIntegrationTest.java
5050
src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java
5151
src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java
52+
src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java
5253
)
5354

5455
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,

0 commit comments

Comments
 (0)