ARTEMIS-994 Support Netty Native Epoll on Linux

The following changes are made to support Epoll.

Refactored SharedNioEventLoopGroup into renamed SharedEventLoopGroup to be generic (as so we can re-use for both Nio and Epoll)

Add support and toggles for Epoll in NettyAcceptor and NettyConnector (with fall back to NIO if cannot load Epoll)

Removal from code of PartialPooledByteBufAllocator, caused bad address when doing native, and no longer needed - see jira discussion

New Connector Properties:

useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully)
remotingThreads = same behaviour as nioRemotingThreads. Previous property is depreated.
useGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool. Old property is deprecated.

New Acceptor Properties:

useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully)
useGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool but for Epoll.

This closes #1093
This commit is contained in:
Michael André Pearce 2017-03-15 06:06:45 +00:00 committed by Justin Bertram
parent 2c9b02806d
commit a610748c09
6 changed files with 333 additions and 42 deletions

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
public class DelegatingEventLoopGroup implements EventLoopGroup {
private final EventLoopGroup delegate;
public DelegatingEventLoopGroup(EventLoopGroup eventLoopGroup) {
this.delegate = eventLoopGroup;
}
@Override
public EventLoop next() {
return delegate.next();
}
@Override
public ChannelFuture register(Channel channel) {
return delegate.register(channel);
}
@Override
public ChannelFuture register(ChannelPromise channelPromise) {
return delegate.register(channelPromise);
}
@Override
@Deprecated
public ChannelFuture register(Channel channel, ChannelPromise channelPromise) {
return delegate.register(channel, channelPromise);
}
@Override
public boolean isShuttingDown() {
return delegate.isShuttingDown();
}
@Override
public Future<?> shutdownGracefully() {
return delegate.shutdownGracefully();
}
@Override
public Future<?> shutdownGracefully(long l, long l1, TimeUnit timeUnit) {
return delegate.shutdownGracefully(l, l1, timeUnit);
}
@Override
public Future<?> terminationFuture() {
return delegate.terminationFuture();
}
@Override
@Deprecated
public void shutdown() {
delegate.shutdown();
}
@Override
@Deprecated
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public Iterator<EventExecutor> iterator() {
return delegate.iterator();
}
@Override
public Future<?> submit(Runnable runnable) {
return delegate.submit(runnable);
}
@Override
public <T> Future<T> submit(Runnable runnable, T t) {
return delegate.submit(runnable, t);
}
@Override
public <T> Future<T> submit(Callable<T> callable) {
return delegate.submit(callable);
}
@Override
public io.netty.util.concurrent.ScheduledFuture<?> schedule(Runnable runnable, long l, TimeUnit timeUnit) {
return delegate.schedule(runnable, l, timeUnit);
}
@Override
public <V> io.netty.util.concurrent.ScheduledFuture<V> schedule(Callable<V> callable, long l, TimeUnit timeUnit) {
return delegate.schedule(callable, l, timeUnit);
}
@Override
public io.netty.util.concurrent.ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable,
long l,
long l1,
TimeUnit timeUnit) {
return delegate.scheduleAtFixedRate(runnable, l, l1, timeUnit);
}
@Override
public io.netty.util.concurrent.ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
long l,
long l1,
TimeUnit timeUnit) {
return delegate.scheduleWithFixedDelay(runnable, l, l1, timeUnit);
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
delegate.execute(command);
}
@Override
public void forEach(Consumer<? super EventExecutor> action) {
delegate.forEach(action);
}
@Override
public Spliterator<EventExecutor> spliterator() {
return delegate.spliterator();
}
}

View File

@ -23,7 +23,6 @@ import java.util.Map;
import java.util.concurrent.Semaphore;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -216,7 +215,7 @@ public class NettyConnection implements Connection {
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
}
@Override

View File

@ -58,6 +58,9 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -218,6 +221,12 @@ public class NettyConnector extends AbstractConnector {
private boolean useNioGlobalWorkerPool;
private boolean useEpoll;
private int epollRemotingThreads;
private boolean useEpollGlobalWorkerPool;
private ScheduledExecutorService scheduledThreadPool;
private Executor closeExecutor;
@ -288,6 +297,13 @@ public class NettyConnector extends AbstractConnector {
useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, configuration);
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration);
useEpollGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL, configuration);
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
@ -371,22 +387,46 @@ public class NettyConnector extends AbstractConnector {
return;
}
int threadsToUse;
// Default to number of cores * 3
int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3;
if (nioRemotingThreads == -1) {
// Default to number of cores * 3
if (useEpoll) {
if (Epoll.isAvailable()) {
int epollThreadsToUse;
if (epollRemotingThreads == -1) {
epollThreadsToUse = defaultThreadsToUse;
} else {
epollThreadsToUse = this.epollRemotingThreads;
}
if (useEpollGlobalWorkerPool) {
channelClazz = EpollSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(epollThreadsToUse, threadFactory)));
} else {
channelClazz = EpollSocketChannel.class;
group = new EpollEventLoopGroup(epollThreadsToUse);
}
logger.info("Connector using native epoll");
threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
} else {
threadsToUse = this.nioRemotingThreads;
} else {
logger.warn("Connector unable to load native epoll, will continue and load nio");
}
}
if (useNioGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedNioEventLoopGroup.getInstance(threadsToUse);
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(threadsToUse);
if (channelClazz == null || group == null) {
int nioThreadsToUse;
if (nioRemotingThreads == -1) {
nioThreadsToUse = defaultThreadsToUse;
} else {
nioThreadsToUse = this.nioRemotingThreads;
}
if (useNioGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(nioThreadsToUse, threadFactory)));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(nioThreadsToUse);
}
logger.info("Connector using nio");
}
// if we are a servlet wrap the socketChannelFactory
@ -1053,7 +1093,7 @@ public class NettyConnector extends AbstractConnector {
}
public static void clearThreadPools() {
SharedNioEventLoopGroup.forceShutdown();
SharedEventLoopGroup.forceShutdown();
}
private static ClassLoader getThisClassLoader() {

View File

@ -23,8 +23,9 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
@ -32,41 +33,41 @@ import io.netty.util.concurrent.Promise;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
public class SharedNioEventLoopGroup extends NioEventLoopGroup {
public class SharedEventLoopGroup extends DelegatingEventLoopGroup {
private static SharedNioEventLoopGroup instance;
private static SharedEventLoopGroup instance;
private final AtomicReference<ScheduledFuture<?>> shutdown = new AtomicReference<>();
private final AtomicLong nioChannelFactoryCount = new AtomicLong();
private final AtomicLong channelFactoryCount = new AtomicLong();
private final Promise<?> terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise();
private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory) {
super(numThreads, factory);
private SharedEventLoopGroup(EventLoopGroup eventLoopGroup) {
super(eventLoopGroup);
}
public static synchronized void forceShutdown() {
if (instance != null) {
instance.shutdown();
instance.nioChannelFactoryCount.set(0);
instance.channelFactoryCount.set(0);
instance = null;
}
}
public static synchronized SharedNioEventLoopGroup getInstance(int numThreads) {
public static synchronized SharedEventLoopGroup getInstance(Function<ThreadFactory, EventLoopGroup> eventLoopGroupSupplier) {
if (instance != null) {
ScheduledFuture f = instance.shutdown.getAndSet(null);
if (f != null) {
f.cancel(false);
}
} else {
instance = new SharedNioEventLoopGroup(numThreads, AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
instance = new SharedEventLoopGroup(eventLoopGroupSupplier.apply(AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
})));
}
instance.nioChannelFactoryCount.incrementAndGet();
instance.channelFactoryCount.incrementAndGet();
return instance;
}
@ -82,13 +83,13 @@ public class SharedNioEventLoopGroup extends NioEventLoopGroup {
@Override
public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit) {
if (nioChannelFactoryCount.decrementAndGet() == 0) {
if (channelFactoryCount.decrementAndGet() == 0) {
shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
synchronized (SharedNioEventLoopGroup.class) {
synchronized (SharedEventLoopGroup.class) {
if (shutdown.get() != null) {
Future<?> future = SharedNioEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future future) throws Exception {

View File

@ -51,6 +51,10 @@ public class TransportConstants {
public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "useNioGlobalWorkerPool";
public static final String USE_EPOLL_PROP_NAME = "useEpoll";
public static final String USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME = "useEpollGlobalWorkerPool";
public static final String USE_INVM_PROP_NAME = "useInvm";
public static final String ACTIVEMQ_SERVER_NAME = "activemqServerName";
@ -113,6 +117,8 @@ public class TransportConstants {
public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads";
public static final String EPOLL_REMOTING_THREADS_PROPNAME = "epollRemotingThreads";
public static final String BATCH_DELAY = "batchDelay";
public static final String DIRECT_DELIVER = "directDeliver";
@ -127,6 +133,10 @@ public class TransportConstants {
public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true;
public static final boolean DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL = true;
public static final boolean DEFAULT_USE_EPOLL = true;
public static final boolean DEFAULT_USE_INVM = false;
public static final boolean DEFAULT_USE_SERVLET = false;
@ -218,6 +228,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME);
@ -237,6 +248,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
@ -267,6 +279,8 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.SERVLET_PATH);
allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME);
@ -284,6 +298,7 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());

View File

@ -44,6 +44,9 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
@ -117,6 +120,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean useInvm;
private final boolean useEpoll;
private final ProtocolHandler protocolHandler;
private final String host;
@ -154,6 +159,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final int nioRemotingThreads;
private final int epollRemotingThreads;
private final ConcurrentMap<Object, NettyServerConnection> connections = new ConcurrentHashMap<>();
private final Map<String, Object> configuration;
@ -202,6 +209,11 @@ public class NettyAcceptor extends AbstractAcceptor {
sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration);
epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME, -1, configuration);
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@ -270,22 +282,48 @@ public class NettyAcceptor extends AbstractAcceptor {
channelClazz = LocalServerChannel.class;
eventLoopGroup = new LocalEventLoopGroup();
} else {
int threadsToUse;
// Default to number of cores * 3
int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3;
if (nioRemotingThreads == -1) {
// Default to number of cores * 3
if (useEpoll) {
if (Epoll.isAvailable()) {
int epollThreadsToUse;
if (epollRemotingThreads == -1) {
epollThreadsToUse = defaultThreadsToUse;
} else {
epollThreadsToUse = this.epollRemotingThreads;
}
threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
} else {
threadsToUse = this.nioRemotingThreads;
}
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(threadsToUse, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
channelClazz = EpollServerSocketChannel.class;
eventLoopGroup = new EpollEventLoopGroup(epollThreadsToUse, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
logger.info("Acceptor using native epoll");
} else {
logger.warn("Acceptor unable to load native epoll, will continue and load nio");
}
}));
}
if (channelClazz == null || eventLoopGroup == null) {
int nioThreadsToUse;
if (nioRemotingThreads == -1) {
nioThreadsToUse = defaultThreadsToUse;
} else {
nioThreadsToUse = nioRemotingThreads;
}
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(nioThreadsToUse, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
logger.info("Acceptor using nio");
}
}
bootstrap = new ServerBootstrap();