ARTEMIS-1342: Support Netty Native KQueue on macOS

Add support for KQueue for when server or client runs on macOS. This is inline with the epoll support for linux.
This commit is contained in:
Michael Andre Pearce 2017-08-09 17:43:40 +01:00 committed by Clebert Suconic
parent 687e318d0e
commit 0bc5510059
9 changed files with 108 additions and 8 deletions

View File

@ -61,6 +61,7 @@ public final class Env {
private static final String OS = System.getProperty("os.name").toLowerCase();
private static final boolean IS_LINUX = OS.startsWith("linux");
private static final boolean IS_MAC = OS.startsWith("mac");
private static final boolean IS_64BIT = checkIs64bit();
private Env() {
@ -87,6 +88,10 @@ public final class Env {
return IS_LINUX == true;
}
public static boolean isMacOs() {
return IS_MAC == true;
}
public static boolean is64BitJvm() {
return IS_64BIT;
}

View File

@ -536,4 +536,9 @@ public interface ActiveMQClientLogger extends BasicLogger {
@Message(id = 214033, value = "Cannot resolve host ",
format = Message.Format.MESSAGE_FORMAT)
void unableToResolveHost(@Cause UnknownHostException e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 214034, value = "Unable to check KQueue availability ",
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckKQueueAvailability(@Cause Throwable e);
}

View File

@ -25,9 +25,9 @@ import org.apache.activemq.artemis.utils.Env;
*/
public final class Epoll {
private static final boolean IS_AVAILABLE_EPOLL = isIsAvailableEpoll();
private static final boolean IS_EPOLL_AVAILABLE = isEpollAvailable();
private static boolean isIsAvailableEpoll() {
private static boolean isEpollAvailable() {
try {
if (Env.is64BitJvm() && Env.isLinuxOs()) {
return io.netty.channel.epoll.Epoll.isAvailable();
@ -46,6 +46,6 @@ public final class Epoll {
}
public static boolean isAvailable() {
return IS_AVAILABLE_EPOLL;
return IS_EPOLL_AVAILABLE;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;
/**
* Tells if <a href="http://netty.io/wiki/native-transports.html">{@code netty-transport-native-kqueue}</a> is supported.
*/
public final class KQueue {
private static final boolean IS_KQUEUE_AVAILABLE = isKQueueAvailable();
private static boolean isKQueueAvailable() {
try {
if (Env.is64BitJvm() && Env.isMacOs()) {
return io.netty.channel.kqueue.KQueue.isAvailable();
} else {
return false;
}
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e);
return false;
}
}
private KQueue() {
}
public static boolean isAvailable() {
return IS_KQUEUE_AVAILABLE;
}
}

View File

@ -65,6 +65,8 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.base64.Base64;
@ -232,6 +234,8 @@ public class NettyConnector extends AbstractConnector {
private boolean useEpoll;
private boolean useKQueue;
private int remotingThreads;
private boolean useGlobalWorkerPool;
@ -309,6 +313,7 @@ public class NettyConnector extends AbstractConnector {
useGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME, useGlobalWorkerPool, configuration);
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@ -415,6 +420,15 @@ public class NettyConnector extends AbstractConnector {
channelClazz = EpollSocketChannel.class;
logger.debug("Connector " + this + " using native epoll");
} else if (useKQueue && KQueue.isAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new KQueueEventLoopGroup(remotingThreads);
}
channelClazz = KQueueSocketChannel.class;
logger.debug("Connector " + this + " using native kqueue");
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;

View File

@ -53,6 +53,8 @@ public class TransportConstants {
public static final String USE_EPOLL_PROP_NAME = "useEpoll";
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
@Deprecated
/**
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
@ -157,6 +159,8 @@ public class TransportConstants {
public static final boolean DEFAULT_USE_EPOLL = true;
public static final boolean DEFAULT_USE_KQUEUE = true;
public static final boolean DEFAULT_USE_INVM = false;
public static final boolean DEFAULT_USE_SERVLET = false;
@ -255,6 +259,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@ -309,6 +314,7 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);

View File

@ -36,6 +36,7 @@
<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
</feature>

View File

@ -55,6 +55,8 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@ -96,6 +98,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static String INVM_ACCEPTOR_TYPE = "IN-VM";
public static String NIO_ACCEPTOR_TYPE = "NIO";
public static String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@ -130,6 +133,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean useEpoll;
private final boolean useKQueue;
private final ProtocolHandler protocolHandler;
private final String host;
@ -228,6 +233,7 @@ public class NettyAcceptor extends AbstractAcceptor {
remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.REMOTING_THREADS_PROPNAME, remotingThreads, configuration);
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@ -318,6 +324,17 @@ public class NettyAcceptor extends AbstractAcceptor {
acceptorType = EPOLL_ACCEPTOR_TYPE;
logger.debug("Acceptor using native epoll");
} else if (useKQueue && KQueue.isAvailable()) {
channelClazz = KQueueServerSocketChannel.class;
eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
acceptorType = KQUEUE_ACCEPTOR_TYPE;
logger.debug("Acceptor using native kqueue");
} else {
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {

View File

@ -98,23 +98,24 @@ under the License.
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true</acceptor>
</acceptors>