Skip to content

Commit 08c2a9d

Browse files
feature[PanamaUring]
1,实现probe以及添加默认操作符合法性校验 2,修复一个潜在的wakeup失败的问题
1 parent a04ec1b commit 08c2a9d

11 files changed

Lines changed: 239 additions & 70 deletions

File tree

panama-uring/src/main/java/top/dreamlike/panama/uring/async/fd/IoUringSelectedReadableFd.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.CompletableFuture;
2121

2222
public interface IoUringSelectedReadableFd extends IoUringOperator, NativeFd {
23-
2423
IoUringBufferRing bufferRing();
2524

2625
default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset) {
@@ -30,7 +29,7 @@ default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset)
3029
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_BUFFER_SELECT));
3130
sqe.setBufGroup(bufferRing.getBufferGroupId());
3231
})
33-
.thenCompose(cqe -> {
32+
.thenComposeAsync(cqe -> {
3433
int syscallResult = cqe.getRes();
3534
if (syscallResult < 0) {
3635
return CompletableFuture.failedFuture(new SyscallException(syscallResult));
@@ -40,7 +39,7 @@ default CancelableFuture<OwnershipMemory> asyncSelectedRead(int len, int offset)
4039
IoUringBufferRingElement ringElement = bufferRing.removeBuffer(bid).resultNow();
4140
return CompletableFuture.completedFuture(borrowUringBufferRingElement(ringElement, readLen));
4241
}
43-
});
42+
}, r -> owner().runOnEventLoop(r));
4443
}
4544

4645
default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedReadResult(int len, int offset) {
@@ -50,7 +49,8 @@ default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedRea
5049
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_BUFFER_SELECT));
5150
sqe.setBufGroup(bufferRing.getBufferGroupId());
5251
})
53-
.thenApply(cqe -> {
52+
.thenApplyAsync(cqe -> {
53+
//强制制定在eventloop上 防止外部get导致切换线程
5454
IoUringSyscallResult<OwnershipMemory> result;
5555
if (cqe.getRes() < 0) {
5656
result = new IoUringSyscallResult<>(cqe.getRes(), OwnershipMemory.of(MemorySegment.NULL));
@@ -61,7 +61,7 @@ default CancelableFuture<IoUringSyscallResult<OwnershipMemory>> asyncSelectedRea
6161
result = new IoUringSyscallResult<>(cqe.getRes(), borrowUringBufferRingElement(ringElement, readLen));
6262
}
6363
return result;
64-
});
64+
}, r -> owner().runOnEventLoop(r));
6565

6666
}
6767

@@ -97,7 +97,7 @@ public MemorySegment resource() {
9797

9898
@Override
9999
public void drop() {
100-
IoUringBufferRingElement waitToRelease = (IoUringBufferRingElement) ELEMENT_VH.compareAndExchange(this, element, (IoUringBufferRingElement)null);
100+
IoUringBufferRingElement waitToRelease = (IoUringBufferRingElement) ELEMENT_VH.compareAndExchange(this, element, (IoUringBufferRingElement) null);
101101
if (waitToRelease == null) {
102102
return;
103103
}

panama-uring/src/main/java/top/dreamlike/panama/uring/eventloop/IoUringEventLoop.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import top.dreamlike.panama.uring.nativelib.Instance;
1111
import top.dreamlike.panama.uring.nativelib.exception.SyscallException;
1212
import top.dreamlike.panama.uring.nativelib.helper.NativeHelper;
13+
import top.dreamlike.panama.uring.nativelib.helper.OSIoUringProbe;
1314
import top.dreamlike.panama.uring.nativelib.libs.LibUring;
1415
import top.dreamlike.panama.uring.nativelib.struct.liburing.*;
1516
import top.dreamlike.panama.uring.nativelib.struct.time.KernelTime64Type;
@@ -37,6 +38,8 @@
3738

3839
public class IoUringEventLoop extends Thread implements AutoCloseable, Executor {
3940

41+
private static final OSIoUringProbe PROBE = new OSIoUringProbe();
42+
4043
private static final AtomicInteger count = new AtomicInteger(0);
4144
private static final Logger log = LogManager.getLogger(IoUringEventLoop.class);
4245

@@ -108,7 +111,6 @@ private void initWakeUpFdMultiShot() {
108111
@Override
109112
public void run() {
110113
while (!hasClosed.get()) {
111-
inWait.set(true);
112114
while (true) {
113115
ScheduledTask next = scheduledTasks.peek();
114116
if (next != null && next.deadlineNanos <= System.nanoTime()) {
@@ -132,7 +134,7 @@ public void run() {
132134
kernelTime64Type.setTv_nsec(duration % 1000000000);
133135
libUring.io_uring_submit_and_wait_timeout(internalRing, cqePtrs, cqeSize, kernelTime64Type, null);
134136
}
135-
137+
inWait.set(true);
136138
processCqes();
137139
}
138140
releaseResource();
@@ -167,7 +169,7 @@ public <V> CompletableFuture<V> runOnEventLoop(Supplier<V> callable) {
167169
return future;
168170
}
169171

170-
private void runOnEventLoop(Runnable runnable) {
172+
public void runOnEventLoop(Runnable runnable) {
171173
if (Thread.currentThread() == this) {
172174
runWithCatchException(runnable);
173175
} else {
@@ -212,6 +214,10 @@ private CancelToken fillTemplate(Consumer<IoUringSqe> sqeFunction, Consumer<IoUr
212214
Runnable r = () -> {
213215
IoUringSqe sqe = ioUringGetSqe();
214216
sqeFunction.accept(sqe);
217+
if (NativeHelper.enableOpVersionCheck && sqe.getOpcode() > PROBE.getLastOp()) {
218+
Instance.LIB_URING.io_uring_back_sqe(internalRing);
219+
throw new UnsupportedOperationException(sqe.getOpcode() + " is unsupported");
220+
}
215221
sqe.setUser_data(token);
216222
callBackMap.put(token, new IoUringCompletionCallBack(sqe.getFd(), sqe.getOpcode(), callback));
217223
if (needSubmit) {
@@ -251,6 +257,12 @@ public CancelToken asyncOperation(Consumer<IoUringSqe> sqeFunction, Consumer<IoU
251257
Runnable r = () -> {
252258
IoUringSqe sqe = ioUringGetSqe();
253259
sqeFunction.accept(sqe);
260+
261+
if (NativeHelper.enableOpVersionCheck && sqe.getOpcode() > PROBE.getLastOp()) {
262+
Instance.LIB_URING.io_uring_back_sqe(internalRing);
263+
throw new UnsupportedOperationException(sqe.getOpcode() + " is unsupported");
264+
}
265+
254266
sqe.setUser_data(token);
255267
if (enableLink) {
256268
sqe.setFlags((byte) (sqe.getFlags() | IoUringConstant.IOSQE_IO_LINK));

panama-uring/src/main/java/top/dreamlike/panama/uring/nativelib/Instance.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package top.dreamlike.panama.uring.nativelib;
2+
23
import top.dreamlike.panama.generator.proxy.NativeCallGenerator;
34
import top.dreamlike.panama.generator.proxy.StructProxyGenerator;
4-
import top.dreamlike.panama.uring.nativelib.helper.NativeHelper;
55
import top.dreamlike.panama.uring.nativelib.libs.LibEpoll;
66
import top.dreamlike.panama.uring.nativelib.libs.LibJemalloc;
77
import top.dreamlike.panama.uring.nativelib.libs.LibUring;
@@ -20,6 +20,7 @@ public class Instance {
2020
public static final LibJemalloc LIB_JEMALLOC = new LibJemalloc() {
2121

2222
private static final LibJemalloc FFI = NATIVE_CALL_GENERATOR.generate(LibJemalloc.class);
23+
2324
@Override
2425
public MemorySegment malloc(long size) {
2526
return FFI.malloc(size).reinterpret(size);
@@ -43,12 +44,7 @@ public int posix_memalign(MemorySegment memptr, long alignment, long size) {
4344

4445
static {
4546
NATIVE_CALL_GENERATOR.indyMode();
46-
LibUring generate = NATIVE_CALL_GENERATOR.generate(LibUring.class);
47-
if (Boolean.parseBoolean(System.getProperty("enable-detect-os-version", "false"))) {
48-
LIB_URING = NativeHelper.enhanceCheck(generate, LibUring.class);
49-
} else {
50-
LIB_URING = generate;
51-
}
47+
LIB_URING = NATIVE_CALL_GENERATOR.generate(LibUring.class);
5248
}
5349

5450

panama-uring/src/main/java/top/dreamlike/panama/uring/nativelib/helper/NativeHelper.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,23 @@
66
import top.dreamlike.panama.uring.eventloop.IoUringEventLoop;
77
import top.dreamlike.panama.uring.helper.LambdaHelper;
88
import top.dreamlike.panama.uring.nativelib.Instance;
9-
import top.dreamlike.panama.uring.nativelib.exception.ErrorKernelVersionException;
109
import top.dreamlike.panama.uring.nativelib.exception.SyscallException;
1110
import top.dreamlike.panama.uring.trait.OwnershipResource;
1211

1312
import java.lang.foreign.MemorySegment;
1413
import java.lang.foreign.ValueLayout;
1514
import java.lang.invoke.VarHandle;
1615
import java.lang.reflect.Field;
17-
import java.lang.reflect.Method;
18-
import java.lang.reflect.Proxy;
1916
import java.nio.ByteOrder;
2017
import java.util.List;
21-
import java.util.Optional;
2218
import java.util.function.IntSupplier;
2319

2420
public class NativeHelper {
2521

2622
public static String JAVA_IO_TMPDIR = System.getProperty("java.io.tmpdir");
2723

24+
public static boolean enableOpVersionCheck = System.getProperty("enable-detect-os-version", "true").equalsIgnoreCase("true");
25+
2826
private static final Logger logger = LogManager.getLogger(NativeHelper.class);
2927

3028
private static final String[] errStr = new String[257];
@@ -116,25 +114,6 @@ public static <T> void dropBatch(List<OwnershipResource<T>> memories) {
116114
}
117115
}
118116

119-
120-
public static <T> T enhanceCheck(T afterProxy, Class<T> nativeInterface) {
121-
return (T) Proxy.newProxyInstance(nativeInterface.getClassLoader(), new Class[]{nativeInterface}, (Object proxy, Method method, Object[] args) -> {
122-
KernelVersionLimit annotation = Optional.ofNullable(method.getAnnotation(KernelVersionLimit.class)).orElse(nativeInterface.getAnnotation(KernelVersionLimit.class));
123-
if (annotation != null) {
124-
if (!osLinux) {
125-
logger.error("This method is only supported on Linux");
126-
throw new ErrorKernelVersionException();
127-
}
128-
if (!allowCurrentLinuxVersion(annotation.major(), annotation.minor())) {
129-
logger.error("This method is only supported on Linux kernel version {}.{}, current version is {}.{}",
130-
annotation.major(), annotation.minor(), currentLinuxMajor, currentLinuxMinor);
131-
throw new ErrorKernelVersionException(annotation.major(), annotation.minor());
132-
}
133-
}
134-
return method.invoke(afterProxy, args);
135-
});
136-
}
137-
138117
public static boolean inSameEventLoop(IoUringEventLoop eventLoop, Object o) {
139118
if (isSkipSameEventLoopCheck) {
140119
return true;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package top.dreamlike.panama.uring.nativelib.helper;
2+
3+
import top.dreamlike.panama.generator.proxy.NativeArray;
4+
import top.dreamlike.panama.generator.proxy.StructProxyGenerator;
5+
import top.dreamlike.panama.uring.nativelib.Instance;
6+
7+
import java.lang.foreign.MemorySegment;
8+
9+
public class OSIoUringProbe {
10+
11+
private final int lastOp;
12+
13+
private final IoUringProbeOp[] ops;
14+
15+
public OSIoUringProbe() {
16+
var probe = Instance.LIB_URING.io_uring_get_probe();
17+
if (probe == null) {
18+
throw new RuntimeException("Failed to get probe");
19+
}
20+
lastOp = probe.getLastOp();
21+
byte len = probe.getOpsLen();
22+
ops = new IoUringProbeOp[len];
23+
MemorySegment opsBase = StructProxyGenerator.findMemorySegment(probe)
24+
.asSlice(top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbe.OPS_OFFSET)
25+
.reinterpret(len * top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp.LAYOUT.byteSize());
26+
27+
NativeArray<top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp> ops = Instance.STRUCT_PROXY_GENERATOR.enhanceArray(opsBase);
28+
for (byte i = 0; i < len; i++) {
29+
top.dreamlike.panama.uring.nativelib.struct.liburing.IoUringProbeOp op = ops.get(i);
30+
this.ops[i] = new IoUringProbeOp(op.getOp(), op.getFlags());
31+
}
32+
33+
Instance.LIB_URING.io_uring_free_probe(probe);
34+
}
35+
36+
public int getLastOp() {
37+
return lastOp;
38+
}
39+
40+
public IoUringProbeOp[] getOps() {
41+
return ops;
42+
}
43+
44+
public record IoUringProbeOp(byte op, short flags) {
45+
}
46+
}

panama-uring/src/main/java/top/dreamlike/panama/uring/nativelib/libs/LibUring.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
@CLib("liburing-ffi.so")
2828
@KernelVersionLimit(major = 5, minor = 10)
2929
public interface LibUring {
30-
//跟队列本身相关的操作
3130

31+
@NativeFunction(returnIsPointer = true)
32+
IoUringProbe io_uring_get_probe();
33+
34+
void io_uring_free_probe(@Pointer IoUringProbe probe);
35+
36+
//跟队列本身相关的操作=
3237
int io_uring_queue_init(int entries, @Pointer IoUring ring, int flags);
3338

3439
int io_uring_queue_init_params(int entries, @Pointer IoUring ring, @Pointer IoUringParams p);
@@ -204,6 +209,13 @@ default IoUringSqe io_uring_get_sqe(@Pointer IoUring ring) {
204209
return Instance.STRUCT_PROXY_GENERATOR.enhance(currentSqe);
205210
}
206211

212+
@NativeFunction(fast = true)
213+
default void io_uring_back_sqe(@Pointer IoUring ring) {
214+
MemorySegment realMemory = StructProxyGenerator.findMemorySegment(ring);
215+
int tail = (int) IoUringConstant.AccessShortcuts.IO_URING_SQ_SQE_TAIL_VARHANDLE.get(realMemory, 0L);
216+
IoUringConstant.AccessShortcuts.IO_URING_SQ_SQE_TAIL_VARHANDLE.set(realMemory, 0L, tail - 1);
217+
}
218+
207219
default void io_uring_prep_rw(int opcode, @Pointer IoUringSqe sqe, int fd, MemorySegment addr, int len, long offset) {
208220
if (!StructProxyGenerator.isNativeStruct(sqe)) {
209221
throw new StructException("sqe is not struct,pleace call StructProxyGenerator::enhance before calling native function");
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package top.dreamlike.panama.uring.nativelib.struct.liburing;
2+
3+
import top.dreamlike.panama.generator.annotation.NativeArrayMark;
4+
import top.dreamlike.panama.uring.nativelib.Instance;
5+
6+
import java.lang.foreign.MemoryLayout;
7+
import java.lang.foreign.MemorySegment;
8+
9+
public class IoUringProbe {
10+
11+
public static final MemoryLayout LAYOUT = Instance.STRUCT_PROXY_GENERATOR.extract(IoUringProbe.class);
12+
13+
public static final long OPS_OFFSET = LAYOUT.byteOffset(MemoryLayout.PathElement.groupElement("ops"));
14+
15+
private byte lastOp;
16+
17+
private byte opsLen;
18+
19+
private short resv;
20+
21+
@NativeArrayMark(size = int.class, length = 3)
22+
private MemorySegment resv2;
23+
24+
@NativeArrayMark(size = IoUringProbeOp.class, length = 0)
25+
private MemorySegment ops;
26+
27+
public byte getLastOp() {
28+
return lastOp;
29+
}
30+
31+
public void setLastOp(byte lastOp) {
32+
this.lastOp = lastOp;
33+
}
34+
35+
public byte getOpsLen() {
36+
return opsLen;
37+
}
38+
39+
public void setOpsLen(byte opsLen) {
40+
this.opsLen = opsLen;
41+
}
42+
43+
public short getResv() {
44+
return resv;
45+
}
46+
47+
public void setResv(short resv) {
48+
this.resv = resv;
49+
}
50+
51+
public MemorySegment getResv2() {
52+
return resv2;
53+
}
54+
55+
public void setResv2(MemorySegment resv2) {
56+
this.resv2 = resv2;
57+
}
58+
59+
public MemorySegment getOps() {
60+
return ops;
61+
}
62+
63+
public void setOps(MemorySegment ops) {
64+
this.ops = ops;
65+
}
66+
}

0 commit comments

Comments
 (0)