This commit is contained in:
Clebert Suconic 2019-08-05 14:20:31 -04:00
commit a098685117
13 changed files with 111 additions and 30 deletions

View File

@ -587,7 +587,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public ClientProtocolManager newProtocolManager() {
return getProtocolManagerFactory().newProtocolManager();
if (threadPool == null) {
throw new NullPointerException("No Thread Pool");
}
return getProtocolManagerFactory().newProtocolManager().setExecutor(new OrderedExecutor(threadPool));
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@ -82,6 +83,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
private ClientSessionFactoryInternal factoryInternal;
private Executor executor;
/**
* Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch}
*/
@ -157,6 +160,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
}
}
@Override
public ActiveMQClientProtocolManager setExecutor(Executor executor) {
this.executor = executor;
return this;
}
@Override
public Lock lockSessionCreation() {
try {
@ -412,7 +421,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors,
TopologyResponseHandler topologyResponseHandler) {
this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor);
this.topologyResponseHandler = topologyResponseHandler;

View File

@ -92,8 +92,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
final long blockingCallTimeout,
final long blockingCallFailoverTimeout,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, null);
final List<Interceptor> outgoingInterceptors,
final Executor connectionExecutor) {
this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);
}
/*
@ -103,9 +104,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
final Connection transportConnection,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors,
final Executor executor,
final SimpleString nodeID) {
this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, executor, nodeID);
final SimpleString nodeID,
final Executor connectionExecutor) {
this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, nodeID, connectionExecutor);
}
private RemotingConnectionImpl(final PacketDecoder packetDecoder,
@ -115,9 +116,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors,
final boolean client,
final Executor executor,
final SimpleString nodeID) {
super(transportConnection, executor);
final SimpleString nodeID,
final Executor connectionExecutor) {
super(transportConnection, connectionExecutor);
this.packetDecoder = packetDecoder;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.spi.core.remoting;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import io.netty.channel.ChannelPipeline;
@ -27,6 +28,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public interface ClientProtocolManager {
ClientProtocolManager setExecutor(Executor executor);
/// Life Cycle Methods:
RemotingConnection connect(Connection transportConnection,

View File

@ -182,14 +182,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
}
}
public Executor getExeuctor() {
if (protonConnectionDelegate != null) {
return protonConnectionDelegate.getExecutor();
} else {
return null;
}
}
public void setConnection(AMQPConnectionContext connection) {
this.amqpConnection = connection;
}

View File

@ -46,17 +46,13 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
AMQPConnectionContext amqpConnection,
Connection transportConnection,
Executor executor) {
super(transportConnection, executor);
Executor connectionExecutor) {
super(transportConnection, connectionExecutor);
this.manager = manager;
this.amqpConnection = amqpConnection;
transportConnection.setProtocolConnection(this);
}
public Executor getExecutor() {
return this.executor;
}
public ProtonProtocolManager getManager() {
return manager;
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
/**
@ -44,6 +45,11 @@ public class ProtonClientProtocolManager extends ProtonProtocolManager implement
super(factory, server, Collections.emptyList(), Collections.emptyList());
}
@Override
public ClientProtocolManager setExecutor(Executor executor) {
return null;
}
@Override
public void stop() {
throw new UnsupportedOperationException();

View File

@ -198,9 +198,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public OpenWireConnection(Connection connection,
ActiveMQServer server,
Executor executor,
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf) {
OpenWireFormat wf,
Executor executor) {
super(connection, executor);
this.server = server;
this.operationContext = server.newOperationContext();

View File

@ -232,7 +232,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor());
owConn.sendHandshake();
//first we setup ttl to -1

View File

@ -328,15 +328,18 @@ public interface Configuration {
Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming);
/**
* deprecated: we decide based on the semantic context when to make things async or not
* Returns whether code coming from connection is executed asynchronously or not. <br>
* Default value is
* {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}.
*/
@Deprecated
boolean isAsyncConnectionExecutionEnabled();
/**
* Sets whether code coming from connection is executed asynchronously or not.
*/
@Deprecated
Configuration setEnabledAsyncConnectionExecution(boolean enabled);
/**

View File

@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
Executor connectionExecutor = server.getExecutorFactory().getExecutor();
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(), connectionExecutor);
Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);

View File

@ -49,7 +49,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
FileChannel fileChannel = raf.getChannel();
ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
null, 10, raf, fileChannel, 0, dataSize);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize);
@ -69,7 +69,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
FileChannel fileChannel = raf.getChannel();
ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
null, fileId, raf, fileChannel, 0, 0);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize());

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.jms.connection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -50,6 +51,18 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
@Test(timeout = 60000)
public void testOnAcknowledge() throws Exception {
testOnAcknowledge(false);
}
@Test(timeout = 60000)
public void testOnAcknowledgeBlockOnFailover() throws Exception {
// this is validating a case where failover would block
// and yet the exception should already happen asynchronously
testOnAcknowledge(true);
}
public void testOnAcknowledge(boolean blockOnFailover) throws Exception {
mayBlock.set(blockOnFailover);
Connection sendConnection = null;
Connection connection = null;
AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
@ -86,6 +99,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
fail("JMSException expected");
} catch (JMSException e) {
if (blockOnFailover) {
Wait.assertTrue(blocked::get);
unblock();
}
assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
//Ensure JMS Connection ExceptionListener was also invoked
assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100));
@ -102,6 +119,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
@Test(timeout = 60000)
public void testOnSend() throws Exception {
testOnSend(false);
}
@Test(timeout = 60000)
public void testOnSendBlockOnFailover() throws Exception {
testOnSend(true);
}
public void testOnSend(boolean blockOnFailover) throws Exception {
mayBlock.set(blockOnFailover);
Connection sendConnection = null;
Connection connection = null;
AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
@ -125,6 +152,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
fail("JMSException expected");
} catch (JMSException e) {
if (blockOnFailover) {
Wait.assertTrue(blocked::get);
unblock();
}
assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
//Ensure JMS Connection ExceptionListener was also invoked
assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100));
@ -140,6 +171,30 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
}
}
static AtomicBoolean mayBlock = new AtomicBoolean(true);
static AtomicBoolean blocked = new AtomicBoolean(false);
private static void block() {
if (!mayBlock.get()) {
return;
}
blocked.set(true);
try {
long timeOut = System.currentTimeMillis() + 5000;
while (mayBlock.get() && System.currentTimeMillis() < timeOut) {
Thread.yield();
}
} finally {
blocked.set(false);
}
}
private static void unblock() {
mayBlock.set(false);
}
static Packet lastPacketSent;
@ -156,6 +211,12 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
// CheckForFailoverReply is ignored here, as this is simulating an issue where the server is completely not responding, the blocked call should throw an exception asynchrnously to the retry
if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
block();
return true;
}
if (lastPacketSent.getType() == PacketImpl.SESS_ACKNOWLEDGE && packet.getType() == PacketImpl.NULL_RESPONSE) {
return false;
}
@ -167,9 +228,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
// CheckForFailoverReply is ignored here, as this is simulating an issue where the server is completely not responding, the blocked call should throw an exception asynchrnously to the retry
if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
block();
return true;
}
if (lastPacketSent.getType() == PacketImpl.SESS_SEND && packet.getType() == PacketImpl.NULL_RESPONSE) {
return false;
}
return true;
}
}