ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous

This is following up on ARTEMIS-2327.
This commit is contained in:
Clebert Suconic 2019-08-02 12:51:13 -04:00
parent 8d7831a838
commit cd723aa528
8 changed files with 85 additions and 7 deletions

View File

@ -492,7 +492,7 @@ public final class ChannelImpl implements Channel {
if (response == null) { if (response == null) {
ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType()); ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType());
connection.fail(e); connection.asyncFail(e);
throw e; throw e;
} }

View File

@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -218,6 +220,23 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
fail(me, null); fail(me, null);
} }
@Override
public Future asyncFail(final ActiveMQException me) {
FutureTask<Void> task = new FutureTask(() -> {
fail(me);
return null;
});
if (executor == null) {
// only tests cases can do this
task.run();
} else {
executor.execute(task);
}
return task;
}
@Override @Override
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
dataReceived = true; dataReceived = true;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.spi.core.protocol; package org.apache.activemq.artemis.spi.core.protocol;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -132,6 +133,12 @@ public interface RemotingConnection extends BufferHandler {
*/ */
void fail(ActiveMQException me); void fail(ActiveMQException me);
/** Same thing as fail, but using an executor.
* semantic of send here, is asynchrounous.
* @param me
*/
Future asyncFail(ActiveMQException me);
/** /**
* called when the underlying connection fails. * called when the underlying connection fails.
* *

View File

@ -20,6 +20,7 @@ import javax.security.auth.Subject;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -317,6 +318,11 @@ public class ChannelImplTest {
} }
@Override
public Future asyncFail(ActiveMQException me) {
return null;
}
@Override @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) { public void fail(ActiveMQException me, String scaleDownTargetNodeID) {

View File

@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -166,6 +168,22 @@ public class MQTTConnection implements RemotingConnection {
} }
} }
@Override
public Future asyncFail(ActiveMQException me) {
FutureTask<Void> task = new FutureTask(() -> {
fail(me);
return null;
});
// I don't expect asyncFail happening on MQTT, in case of happens this is semantically correct
Thread t = new Thread(task);
t.start();
return task;
}
@Override @Override
public void destroy() { public void destroy() {
//TODO(mtaylor) ensure this properly destroys this connection. //TODO(mtaylor) ensure this properly destroys this connection.

View File

@ -25,6 +25,8 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@ -110,7 +112,7 @@ public final class StompConnection implements RemotingConnection {
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorFactory factory; private final ExecutorFactory executorFactory;
@Override @Override
public boolean isSupportReconnect() { public boolean isSupportReconnect() {
@ -136,7 +138,7 @@ public final class StompConnection implements RemotingConnection {
case ActiveMQStompException.INVALID_EOL_V10: case ActiveMQStompException.INVALID_EOL_V10:
if (version != null) if (version != null)
throw e; throw e;
frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory); frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, executorFactory);
buffer.resetReaderIndex(); buffer.resetReaderIndex();
frame = decode(buffer); frame = decode(buffer);
break; break;
@ -164,16 +166,16 @@ public final class StompConnection implements RemotingConnection {
final Connection transportConnection, final Connection transportConnection,
final StompProtocolManager manager, final StompProtocolManager manager,
final ScheduledExecutorService scheduledExecutorService, final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory factory) { final ExecutorFactory executorFactory) {
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.factory = factory; this.executorFactory = executorFactory;
this.transportConnection = transportConnection; this.transportConnection = transportConnection;
this.manager = manager; this.manager = manager;
this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory); this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory);
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
@ -379,6 +381,24 @@ public final class StompConnection implements RemotingConnection {
internalClose(); internalClose();
} }
@Override
public Future asyncFail(ActiveMQException me) {
FutureTask<Void> task = new FutureTask(() -> {
fail(me);
return null;
});
if (this.executorFactory == null) {
// only tests cases can do this
task.run();
} else {
executorFactory.getExecutor().execute(task);
}
return task;
}
@Override @Override
public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
fail(me); fail(me);
@ -528,7 +548,7 @@ public final class StompConnection implements RemotingConnection {
} }
if (this.version != (StompVersions.V1_0)) { if (this.version != (StompVersions.V1_0)) {
VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory); VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, executorFactory);
newHandler.initDecoder(this.frameHandler); newHandler.initDecoder(this.frameHandler);
this.frameHandler = newHandler; this.frameHandler = newHandler;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.security.auth.Subject; import javax.security.auth.Subject;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -110,6 +111,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
} }
@Override
public Future asyncFail(ActiveMQException me) {
return null;
}
@Override @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) { public void fail(ActiveMQException me, String scaleDownTargetNodeID) {

View File

@ -58,6 +58,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName()); ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName()); ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName());
((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true); ((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true);
((ActiveMQConnectionFactory) cf).setCallTimeout(500);
sendConnection = cf.createConnection(); sendConnection = cf.createConnection();
@ -108,6 +109,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
try { try {
((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName()); ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName()); ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName());
((ActiveMQConnectionFactory) cf).setCallTimeout(500);
sendConnection = cf.createConnection(); sendConnection = cf.createConnection();
sendConnection.setExceptionListener(exceptionOnConnection::set); sendConnection.setExceptionListener(exceptionOnConnection::set);