ARTEMIS-1333 SendACK listener fix

This commit is contained in:
Clebert Suconic 2017-08-08 22:48:25 -04:00
parent 96c6268f5a
commit fabc0701a3
6 changed files with 40 additions and 9 deletions

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -95,7 +94,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@ -150,7 +149,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final Actor<Packet> packetActor; private final Actor<Packet> packetActor;
private final Executor callExecutor; private final ArtemisExecutor callExecutor;
private final CoreProtocolManager manager; private final CoreProtocolManager manager;
@ -214,19 +213,20 @@ public class ServerSessionPacketHandler implements ChannelHandler {
public void connectionFailed(final ActiveMQException exception, boolean failedOver) { public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName()); ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
flushExecutor();
try { try {
session.close(true); session.close(true);
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingSession(e); ActiveMQServerLogger.LOGGER.errorClosingSession(e);
} }
flushExecutor();
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
} }
private void flushExecutor() { public void flushExecutor() {
packetActor.flush(); packetActor.flush();
OrderedExecutorFactory.flushExecutor(callExecutor); callExecutor.flush();
} }
public void close() { public void close() {
@ -247,7 +247,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
@Override @Override
public void handlePacket(final Packet packet) { public void handlePacket(final Packet packet) {
channel.confirm(packet);
// This method will call onMessagePacket through an actor // This method will call onMessagePacket through an actor
packetActor.act(packet); packetActor.act(packet);
@ -838,6 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final boolean flush, final boolean flush,
final boolean closeChannel) { final boolean closeChannel) {
if (confirmPacket != null) { if (confirmPacket != null) {
channel.confirm(confirmPacket);
if (flush) { if (flush) {
channel.flushConfirmations(); channel.flushConfirmations();
} }

View File

@ -167,10 +167,13 @@ public class ActiveMQPacketHandler implements ChannelHandler {
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST); routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
} }
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap); CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel);
channel.setHandler(handler); channel.setHandler(handler);
sessionCallback.setSessionHandler(handler);
// TODO - where is this removed? // TODO - where is this removed?
protocolManager.addSessionHandler(request.getName(), handler); protocolManager.addSessionHandler(request.getName(), handler);

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
@ -44,6 +45,8 @@ public final class CoreSessionCallback implements SessionCallback {
private String name; private String name;
private ServerSessionPacketHandler handler;
public CoreSessionCallback(String name, public CoreSessionCallback(String name,
ProtocolManager protocolManager, ProtocolManager protocolManager,
Channel channel, Channel channel,
@ -54,6 +57,21 @@ public final class CoreSessionCallback implements SessionCallback {
this.connection = connection; this.connection = connection;
} }
public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler handler) {
this.handler = handler;
return this;
}
@Override
public void close(boolean failed) {
ServerSessionPacketHandler localHandler = handler;
if (failed && localHandler != null) {
// We wait any pending tasks before we make this as closed
localHandler.flushExecutor();
}
this.handler = null;
}
@Override @Override
public boolean isWritable(ReadyListener callback, Object protocolContext) { public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback); return connection.isWritable(callback);

View File

@ -345,6 +345,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
protected void doClose(final boolean failed) throws Exception { protected void doClose(final boolean failed) throws Exception {
callback.close(failed);
synchronized (this) { synchronized (this) {
if (!closed) { if (!closed) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
@ -1238,6 +1239,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public void close(final boolean failed) { public void close(final boolean failed) {
if (closed) if (closed)
return; return;
if (failed) {
}
context.executeOnCompletion(new IOCallback() { context.executeOnCompletion(new IOCallback() {
@Override @Override
public void onError(int errorCode, String errorMessage) { public void onError(int errorCode, String errorMessage) {

View File

@ -89,4 +89,8 @@ public interface SessionCallback {
* Some protocols (Openwire) needs a special message with the browser is finished. * Some protocols (Openwire) needs a special message with the browser is finished.
*/ */
void browserFinished(ServerConsumer consumer); void browserFinished(ServerConsumer consumer);
default void close(boolean failed) {
}
} }

View File

@ -52,7 +52,7 @@ public class ReceiveNoWaitTest extends JMSTestBase {
public void testReceiveNoWait() throws Exception { public void testReceiveNoWait() throws Exception {
assertNotNull(queue); assertNotNull(queue);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 1000; i++) {
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);