ARTEMIS-2241 Support direct deliver for InVMAcceptors

Push isDirectDeliver method from netty impl, to the Connection interface
Add support to InVMConnection for isDirectDeliver flag and ability to set via config, defaulting to false, to keep current default behavior. 
Extend DirectDeliverTest to check InVM as well.
This commit is contained in:
Michael André Pearce 2019-01-25 00:35:55 +00:00 committed by Michael Andre Pearce
parent 43a1cc2822
commit b76f0061f8
8 changed files with 61 additions and 16 deletions

View File

@ -429,6 +429,7 @@ public class NettyConnection implements Connection {
return "tcp://" + IPV6Util.encloseHost(address.toString());
}
@Override
public final boolean isDirectDeliver() {
return directDeliver;
}

View File

@ -140,6 +140,8 @@ public interface Connection {
*/
TransportConfiguration getConnectorConfig();
boolean isDirectDeliver();
ActiveMQPrincipal getDefaultActiveMQPrincipal();
/**

View File

@ -411,6 +411,11 @@ public class ChannelImplTest {
return null;
}
@Override
public boolean isDirectDeliver() {
return false;
}
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;

View File

@ -86,7 +86,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -192,11 +191,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
// use the same executor
this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
if (conn instanceof NettyConnection) {
direct = ((NettyConnection) conn).isDirectDeliver();
} else {
direct = false;
}
this.direct = conn.isDirectDeliver();
}
private void clearLargeMessage() {

View File

@ -75,6 +75,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
private final boolean enableBufferPooling;
private final boolean directDeliver;
public InVMAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@ -101,6 +103,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration);
enableBufferPooling = ConfigurationHelper.getBooleanProperty(TransportConstants.BUFFER_POOLING, TransportConstants.DEFAULT_BUFFER_POOLING, configuration);
directDeliver = ConfigurationHelper.getBooleanProperty(TransportConstants.DIRECT_DELIVER, TransportConstants.DEFAULT_DIRECT_DELIVER, configuration);
}
@Override
@ -228,6 +232,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal);
inVMConnection.setEnableBufferPooling(enableBufferPooling);
inVMConnection.setDirectDeliver(directDeliver);
connectionListener.connectionCreated(this, inVMConnection, protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL));
}

View File

@ -65,6 +65,8 @@ public class InVMConnection implements Connection {
private boolean bufferPoolingEnabled = TransportConstants.DEFAULT_BUFFER_POOLING;
private boolean directDeliver = TransportConstants.DEFAULT_DIRECT_DELIVER;
public InVMConnection(final int serverID,
final BufferHandler handler,
final BaseConnectionLifeCycleListener listener,
@ -86,6 +88,7 @@ public class InVMConnection implements Connection {
final BaseConnectionLifeCycleListener listener,
final Executor executor,
final ActiveMQPrincipal defaultActiveMQPrincipal) {
this.serverID = serverID;
this.handler = handler;
@ -275,6 +278,15 @@ public class InVMConnection implements Connection {
return new TransportConfiguration(InVMConnectorFactory.class.getName(), params);
}
@Override
public boolean isDirectDeliver() {
return directDeliver;
}
public void setDirectDeliver(boolean directDeliver) {
this.directDeliver = directDeliver;
}
@Override
public String toString() {
return "InVMConnection [serverID=" + serverID + ", id=" + id + "]";

View File

@ -30,6 +30,10 @@ public final class TransportConstants {
public static final boolean DEFAULT_BUFFER_POOLING = true;
public static final boolean DEFAULT_DIRECT_DELIVER = false;
public static final String DIRECT_DELIVER = "directDeliver";
private TransportConstants() {
// Utility class
}

View File

@ -29,8 +29,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -43,31 +43,52 @@ public class DirectDeliverTest extends ActiveMQTestBase {
private ActiveMQServer server;
private ServerLocator locator;
private ServerLocator nettyLocator;
private ServerLocator inVMLocator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.DIRECT_DELIVER, true);
Map<String, Object> nettyParams = new HashMap<>();
nettyParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DIRECT_DELIVER, true);
TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
TransportConfiguration nettyTransportConfiguration = new TransportConfiguration(NettyAcceptorFactory.class.getName(), nettyParams);
Map<String, Object> inVMParams = new HashMap<>();
inVMParams.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.DIRECT_DELIVER, true);
TransportConfiguration inVMTransportConfiguration = new TransportConfiguration(InVMAcceptorFactory.class.getName(), inVMParams);
Configuration config = createBasicConfig();
config.addAcceptorConfiguration(nettyTransportConfiguration);
config.addAcceptorConfiguration(inVMTransportConfiguration);
Configuration config = createBasicConfig().addAcceptorConfiguration(tc);
server = createServer(false, config);
server.start();
locator = createNettyNonHALocator();
addServerLocator(locator);
nettyLocator = createNettyNonHALocator();
addServerLocator(nettyLocator);
inVMLocator = createInVMLocator(0);
addServerLocator(inVMLocator);
}
@Test
public void testDirectDeliver() throws Exception {
public void testDirectDeliverNetty() throws Exception {
testDirectDeliver(nettyLocator);
}
@Test
public void testDirectDeliverInVM() throws Exception {
testDirectDeliver(inVMLocator);
}
private void testDirectDeliver(ServerLocator serverLocator) throws Exception {
final String foo = "foo";
ClientSessionFactory sf = createSessionFactory(locator);
ClientSessionFactory sf = createSessionFactory(serverLocator);
ClientSession session = sf.createSession();