Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.concurrent.ThreadFactory;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.ServerChannel;

/**
* An {@link EventLoopGroup} with associated {@link io.netty5.channel.Channel} factory.
Expand All @@ -27,12 +29,13 @@
*/
interface DefaultLoop {

<CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass);

<CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass);
<CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass, EventLoop eventLoop);

String getName();

<SERVERCHANNEL extends ServerChannel> SERVERCHANNEL getServerChannel(Class<SERVERCHANNEL> channelClass, EventLoop eventLoop,
EventLoopGroup childEventLoopGroup);

EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory);

boolean supportGroup(EventLoopGroup group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import java.util.concurrent.ThreadFactory;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.MultithreadEventLoopGroup;
import io.netty5.channel.ServerChannel;
import io.netty5.channel.epoll.Epoll;
import io.netty5.channel.epoll.EpollDatagramChannel;
import io.netty5.channel.epoll.EpollDomainDatagramChannel;
import io.netty5.channel.epoll.EpollDomainSocketChannel;
import io.netty5.channel.epoll.EpollEventLoopGroup;
import io.netty5.channel.epoll.EpollHandler;
import io.netty5.channel.epoll.EpollServerDomainSocketChannel;
import io.netty5.channel.epoll.EpollServerSocketChannel;
import io.netty5.channel.epoll.EpollSocketChannel;
Expand All @@ -46,48 +49,40 @@ final class DefaultLoopEpoll implements DefaultLoop {

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass, EventLoop eventLoop) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new EpollSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new EpollServerSocketChannel();
return (CHANNEL) new EpollSocketChannel(eventLoop);
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new EpollDatagramChannel();
return (CHANNEL) new EpollDatagramChannel(eventLoop);
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new EpollDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new EpollServerDomainSocketChannel();
return (CHANNEL) new EpollDomainSocketChannel(eventLoop);
}
if (channelClass.equals(DomainDatagramChannel.class)) {
return (CHANNEL) new EpollDomainDatagramChannel();
return (CHANNEL) new EpollDomainDatagramChannel(eventLoop);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "epoll";
}

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollSocketChannel.class;
}
public <SERVERCHANNEL extends ServerChannel> SERVERCHANNEL getServerChannel(Class<SERVERCHANNEL> channelClass, EventLoop eventLoop,
EventLoopGroup childEventLoopGroup) {
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollServerSocketChannel.class;
return (SERVERCHANNEL) new EpollServerSocketChannel(eventLoop, childEventLoopGroup);
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) EpollDatagramChannel.class;
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (SERVERCHANNEL) new EpollServerDomainSocketChannel(eventLoop, childEventLoopGroup);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "epoll";
}

@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
return new EpollEventLoopGroup(threads, factory);
Expand Down Expand Up @@ -119,4 +114,11 @@ public boolean supportGroup(EventLoopGroup group) {
log.debug("Default Epoll support : " + epoll);
}
}

static final class EpollEventLoopGroup extends MultithreadEventLoopGroup {

EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory, EpollHandler.newFactory());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import java.util.concurrent.ThreadFactory;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.MultithreadEventLoopGroup;
import io.netty5.channel.ServerChannel;
import io.netty5.channel.kqueue.KQueue;
import io.netty5.channel.kqueue.KQueueDatagramChannel;
import io.netty5.channel.kqueue.KQueueDomainDatagramChannel;
import io.netty5.channel.kqueue.KQueueDomainSocketChannel;
import io.netty5.channel.kqueue.KQueueEventLoopGroup;
import io.netty5.channel.kqueue.KQueueHandler;
import io.netty5.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty5.channel.kqueue.KQueueServerSocketChannel;
import io.netty5.channel.kqueue.KQueueSocketChannel;
Expand All @@ -45,48 +48,40 @@ final class DefaultLoopKQueue implements DefaultLoop {

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass, EventLoop eventLoop) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new KQueueSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new KQueueServerSocketChannel();
return (CHANNEL) new KQueueSocketChannel(eventLoop);
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new KQueueDatagramChannel();
return (CHANNEL) new KQueueDatagramChannel(eventLoop);
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new KQueueDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new KQueueServerDomainSocketChannel();
return (CHANNEL) new KQueueDomainSocketChannel(eventLoop);
}
if (channelClass.equals(DomainDatagramChannel.class)) {
return (CHANNEL) new KQueueDomainDatagramChannel();
return (CHANNEL) new KQueueDomainDatagramChannel(eventLoop);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "kqueue";
}

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) KQueueSocketChannel.class;
}
public <SERVERCHANNEL extends ServerChannel> SERVERCHANNEL getServerChannel(Class<SERVERCHANNEL> channelClass, EventLoop eventLoop,
EventLoopGroup childEventLoopGroup) {
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) KQueueServerSocketChannel.class;
return (SERVERCHANNEL) new KQueueServerSocketChannel(eventLoop, childEventLoopGroup);
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) KQueueDatagramChannel.class;
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (SERVERCHANNEL) new KQueueServerDomainSocketChannel(eventLoop, childEventLoopGroup);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "kqueue";
}

@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
return new KQueueEventLoopGroup(threads, factory);
Expand Down Expand Up @@ -118,4 +113,11 @@ public boolean supportGroup(EventLoopGroup group) {
log.debug("Default KQueue support : " + kqueue);
}
}

static final class KQueueEventLoopGroup extends MultithreadEventLoopGroup {

KQueueEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory, KQueueHandler.newFactory());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.concurrent.ThreadFactory;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.ServerChannel;
import io.netty5.channel.socket.DatagramChannel;
import io.netty5.channel.socket.ServerSocketChannel;
import io.netty5.channel.socket.SocketChannel;
Expand All @@ -37,39 +39,31 @@ final class DefaultLoopNIO implements DefaultLoop {

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass, EventLoop eventLoop) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new NioSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new NioServerSocketChannel();
return (CHANNEL) new NioSocketChannel(eventLoop);
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new NioDatagramChannel();
return (CHANNEL) new NioDatagramChannel(eventLoop);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "nio";
}

@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) NioSocketChannel.class;
}
public <SERVERCHANNEL extends ServerChannel> SERVERCHANNEL getServerChannel(Class<SERVERCHANNEL> channelClass, EventLoop eventLoop,
EventLoopGroup childEventLoopGroup) {
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) NioServerSocketChannel.class;
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) NioDatagramChannel.class;
return (SERVERCHANNEL) new NioServerSocketChannel(eventLoop, childEventLoopGroup);
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

@Override
public String getName() {
return "nio";
}

@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
throw new IllegalStateException("Missing Epoll/KQueue on current system");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.Objects;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.ServerChannel;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.netty.ReactorNetty;
Expand Down Expand Up @@ -201,34 +203,17 @@ default Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
* Callback for a {@link Channel} selection.
*
* @param channelType the channel type
* @param group the source {@link EventLoopGroup} to assign a loop from
* @param eventLoop the {@link EventLoop} to assign
* @param <CHANNEL> the {@link Channel} implementation
* @return a {@link Channel} instance
*/
default <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
default <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoop eventLoop) {
DefaultLoop channelFactory =
DefaultLoopNativeDetector.INSTANCE.supportGroup(group) ?
DefaultLoopNativeDetector.INSTANCE.supportGroup(eventLoop) ?
DefaultLoopNativeDetector.INSTANCE :
DefaultLoopNativeDetector.NIO;

return channelFactory.getChannel(channelType);
}

/**
* Callback for a {@link Channel} class selection.
*
* @param channelType the channel type
* @param group the source {@link EventLoopGroup} to assign a loop from
* @param <CHANNEL> the {@link Channel} implementation
* @return a {@link Channel} class
*/
default <CHANNEL extends Channel> Class<? extends CHANNEL> onChannelClass(Class<CHANNEL> channelType, EventLoopGroup group) {
DefaultLoop channelFactory =
DefaultLoopNativeDetector.INSTANCE.supportGroup(group) ?
DefaultLoopNativeDetector.INSTANCE :
DefaultLoopNativeDetector.NIO;

return channelFactory.getChannelClass(channelType);
return channelFactory.getChannel(channelType, eventLoop);
}

/**
Expand All @@ -250,6 +235,25 @@ default EventLoopGroup onClient(boolean useNative) {
*/
EventLoopGroup onServer(boolean useNative);

/**
* Callback for a {@link Channel} selection.
*
* @param <SERVERCHANNEL> the {@link ServerChannel} implementation
* @param channelType the channel type
* @param eventLoop the {@link EventLoop} to assign
* @param childEventLoopGroup the {@link EventLoop} to assign for child {@link Channel}
* @return a {@link ServerChannel} instance
*/
default <SERVERCHANNEL extends ServerChannel> SERVERCHANNEL onServerChannel(Class<SERVERCHANNEL> channelType, EventLoop eventLoop,
EventLoopGroup childEventLoopGroup) {
DefaultLoop channelFactory =
DefaultLoopNativeDetector.INSTANCE.supportGroup(eventLoop) ?
DefaultLoopNativeDetector.INSTANCE :
DefaultLoopNativeDetector.NIO;

return channelFactory.getServerChannel(channelType, eventLoop, childEventLoopGroup);
}

/**
* Callback for server select {@link EventLoopGroup} creation,
* this is the {@link EventLoopGroup} for the acceptor channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.function.Supplier;

import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.ServerChannel;
import io.netty5.resolver.AddressResolverGroup;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
Expand Down Expand Up @@ -245,17 +247,10 @@ public String name() {
}

@Override
public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoop eventLoop) {
requireNonNull(channelType, "channelType");
requireNonNull(group, "group");
return defaultLoops.onChannel(channelType, group);
}

@Override
public <CHANNEL extends Channel> Class<? extends CHANNEL> onChannelClass(Class<CHANNEL> channelType, EventLoopGroup group) {
requireNonNull(channelType, "channelType");
requireNonNull(group, "group");
return defaultLoops.onChannelClass(channelType, group);
requireNonNull(eventLoop, "eventLoop");
return defaultLoops.onChannel(channelType, eventLoop);
}

@Override
Expand All @@ -268,6 +263,15 @@ public EventLoopGroup onServer(boolean useNative) {
return defaultLoops.onServer(useNative);
}

@Override
public <SERVERCHANNEL extends ServerChannel> SERVERCHANNEL onServerChannel(Class<SERVERCHANNEL> channelType,
EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
requireNonNull(channelType, "channelType");
requireNonNull(eventLoop, "eventLoop");
requireNonNull(childEventLoopGroup, "childEventLoopGroup");
return defaultLoops.onServerChannel(channelType, eventLoop, childEventLoopGroup);
}

@Override
public EventLoopGroup onServerSelect(boolean useNative) {
return defaultLoops.onServerSelect(useNative);
Expand Down
Loading