From 579d6226aa28b79ca15f5cf8ab3cc50415180656 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 8 Aug 2016 19:29:27 -0400 Subject: [PATCH] ARTEMIS-671 Returning messages after connection killed, and validating usage of reconnect --- .../api/core/ActiveMQExceptionType.java | 6 + .../ActiveMQRemoteDisconnectException.java | 33 +++ .../artemis/logs/AssertionLoggerHandler.java | 13 ++ .../core/impl/RemotingConnectionImpl.java | 16 +- .../impl/netty/ActiveMQChannelHandler.java | 2 + .../protocol/AbstractRemotingConnection.java | 5 + .../spi/core/protocol/RemotingConnection.java | 7 + .../core/protocol/mqtt/MQTTConnection.java | 5 + .../core/protocol/stomp/StompConnection.java | 5 + .../server/impl/RemotingServiceImpl.java | 28 +-- .../clientcrash/PendingDeliveriesTest.java | 206 ++++++++++++++++++ .../artemis/tests/util/SpawnedVMSupport.java | 11 +- 12 files changed, 305 insertions(+), 32 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index eb4bf5df0c..254d74ce46 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -172,6 +172,12 @@ public enum ActiveMQExceptionType { return new ActiveMQInvalidTransientQueueUseException(msg); } }, + REMOTE_DISCONNECT(119) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQRemoteDisconnectException(msg); + } + }, GENERIC_EXCEPTION(999), NATIVE_ERROR_INTERNAL(200), diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java new file mode 100644 index 0000000000..9d44b7d557 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.REMOTE_DISCONNECT; + +/** + * A security problem occurred (authentication issues, permission issues,...) + */ +public final class ActiveMQRemoteDisconnectException extends ActiveMQException { + + public ActiveMQRemoteDisconnectException() { + super(REMOTE_DISCONNECT); + } + + public ActiveMQRemoteDisconnectException(String msg) { + super(REMOTE_DISCONNECT, msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java index e0c1215dba..d7d9214b8f 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java @@ -71,6 +71,19 @@ public class AssertionLoggerHandler extends ExtHandler { return false; } + public static boolean findText(long mstimeout, String ... text) { + + long timeMax = System.currentTimeMillis() + mstimeout; + do { + if (findText(text)) { + return true; + } + } + while (timeMax > System.currentTimeMillis()); + + return false; + + } /** * Find a line that contains the parameters passed as an argument * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index f7dfa32281..2a3522f951 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; @@ -191,7 +192,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement destroyed = true; } - ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + if (!(me instanceof ActiveMQRemoteDisconnectException)) { + ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + } try { transportConnection.forceClose(); @@ -329,6 +332,17 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement return getTransportConnection().getDefaultActiveMQPrincipal(); } + @Override + public boolean isSupportReconnect() { + for (Channel channel : channels.values()) { + if (channel.getConfirmationWindowSize() > 0) { + return true; + } + } + + return false; + } + // Buffer Handler implementation // ---------------------------------------------------- @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index d963d1d698..c581a5a24b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -77,6 +77,8 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { active = false; } } + + super.channelInactive(ctx); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index b7c0d17c40..c4387662d4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -201,6 +201,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { return res; } + @Override + public boolean isSupportReconnect() { + return false; + } + /* * This can be called concurrently by more than one thread so needs to be locked */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index fe1a087aee..0f16db7c17 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -189,4 +189,11 @@ public interface RemotingConnection extends BufferHandler { *if slow consumer is killed,send the msessage to client. */ void killMessage(SimpleString nodeID); + + /** + * This will check if reconnects are supported on the protocol and configuration. + * In case it's not supported a connection failure could remove messages right away from pending deliveries. + * @return + */ + boolean isSupportReconnect(); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index f651d3d964..aa87bd8e6e 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -216,4 +216,9 @@ public class MQTTConnection implements RemotingConnection { public void killMessage(SimpleString nodeID) { //unsupported } + + @Override + public boolean isSupportReconnect() { + return false; + } } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 475a34c0a5..36f440c34b 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -96,6 +96,11 @@ public final class StompConnection implements RemotingConnection { private final int minLargeMessageSize; + @Override + public boolean isSupportReconnect() { + return false; + } + public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { StompFrame frame = null; try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 60ac9a0c01..4ff356e56e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; -import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; @@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServiceRegistry; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; -import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -536,29 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry conn = connections.get(connectionID); - if (conn != null) { - // Bit of a hack - find a better way to do this + if (conn != null && !conn.connection.isSupportReconnect()) { + removeConnection(connectionID); - List failureListeners = conn.connection.getFailureListeners(); - - boolean empty = true; - - for (FailureListener listener : failureListeners) { - if (listener instanceof ServerSessionImpl) { - empty = false; - - break; - } - } - - // We only destroy the connection if the connection has no sessions attached to it - // Otherwise it means the connection has died without the sessions being closed first - // so we need to keep them for ttl, in case re-attachment occurs - if (empty) { - removeConnection(connectionID); - - conn.connection.destroy(); - } + conn.connection.fail(new ActiveMQRemoteDisconnectException()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java new file mode 100644 index 0000000000..fa497804b1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.clientcrash; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.SpawnedVMSupport; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PendingDeliveriesTest extends ClientTestBase { + + + @Before + public void createQueue() throws Exception { + server.createQueue(SimpleString.toSimpleString("jms.queue.queue1"), SimpleString.toSimpleString("jms.queue.queue1"), null, true, false); + } + + @After + public void clearLogger() throws Exception { + System.out.println("After clearing"); + AssertionLoggerHandler.stopCapture(); + AssertionLoggerHandler.clear(); + } + + private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false"; + private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1"; + private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024); + + public static void main(String[] arg) { + if (arg.length != 3) { + System.err.println("Usage:: URI destinationName cleanShutdown"); + System.exit(-1); + } + + + String uri = arg[0]; + String destinationName = arg[1]; + boolean cleanShutdown = Boolean.valueOf(arg[2]); + + + ConnectionFactory factory; + + factory = createCF(uri); + + try { + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + + System.err.println("***** " + destination); + connection.start(); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("hello")); + } + + System.err.println("CleanShutdown::" + cleanShutdown); + + if (cleanShutdown) { + consumer.close(); + connection.close(); + } + + System.exit(0); + + } + catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + + } + + private static ConnectionFactory createCF(String uri) { + ConnectionFactory factory; + if (uri.startsWith("amqp")) { + factory = new JmsConnectionFactory(uri); + } + else { + factory = new ActiveMQConnectionFactory(uri); + } + return factory; + } + + @Test + public void testWithoutReconnect() throws Exception { + + internalNoReconnect(AMQP_URI, "jms.queue.queue1"); + internalNoReconnect(CORE_URI_NO_RECONNECT, "queue1"); + } + + private void internalNoReconnect(String uriToUse, String destinationName) throws Exception { + startClient(uriToUse, destinationName, true, false); + + ConnectionFactory cf = createCF(uriToUse); + Connection connection = cf.createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + + for (int i = 0; i < 100; i++) { + Assert.assertNotNull(consumer.receive(1000)); + } + } + finally { + connection.stop(); + connection.close(); + + } + + if (cf instanceof ActiveMQConnectionFactory) { + ((ActiveMQConnectionFactory)cf).close(); + } + + } + + + @Test + public void testWithtReconnect() throws Exception { + startClient(CORE_URI_WITH_RECONNECT, "queue1", true, false); + ConnectionFactory cf = createCF(CORE_URI_WITH_RECONNECT); + Connection connection = cf.createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("queue1"); + MessageConsumer consumer = session.createConsumer(destination); + + int i = 0; + for (; i < 100; i++) { + Message msg = consumer.receive(100); + if (msg == null) { + break; + } + } + + Assert.assertTrue(i < 100); + } + finally { + connection.stop(); + connection.close(); + + } + } + + + @Test + public void testCleanShutdownNoLogger() throws Exception { + AssertionLoggerHandler.startCapture(); + startClient(CORE_URI_NO_RECONNECT, "queue1", false, true); + Thread.sleep(500); + Assert.assertFalse(AssertionLoggerHandler.findText("clearing up resources")); + } + + @Test + public void testBadShutdownLogger() throws Exception { + AssertionLoggerHandler.startCapture(); + startClient(CORE_URI_NO_RECONNECT, "queue1", false, false); + Assert.assertTrue(AssertionLoggerHandler.findText(1000, "clearing up resources")); + } + + + @Test + public void testCleanShutdown() throws Exception { + + } + + private void startClient(String uriToUse, String destinationName, boolean log, boolean cleanShutdown) throws Exception { + Process process = SpawnedVMSupport.spawnVM(PendingDeliveriesTest.class.getName(), log, uriToUse, destinationName, Boolean.toString(cleanShutdown)); + Assert.assertEquals(0, process.waitFor()); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java index dc81bc6933..0ae36b5f3d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java @@ -119,10 +119,7 @@ public final class SpawnedVMSupport { Process process = builder.start(); - if (logOutput) { - SpawnedVMSupport.startLogger(wordMatch, wordRunning, className, process); - - } + SpawnedVMSupport.startLogger(logOutput, wordMatch, wordRunning, className, process); // Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread: // http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815 @@ -138,8 +135,8 @@ public final class SpawnedVMSupport { * @param process * @throws ClassNotFoundException */ - public static void startLogger(final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException { - ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className, wordMatch, wordRunanble); + public static void startLogger(final boolean print, final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException { + ProcessLogger outputLogger = new ProcessLogger(print, process.getInputStream(), className, wordMatch, wordRunanble); outputLogger.start(); } @@ -149,7 +146,7 @@ public final class SpawnedVMSupport { * @throws ClassNotFoundException */ public static void startLogger(final String className, final Process process) throws ClassNotFoundException { - startLogger(null, null, className, process); + startLogger(true, null, null, className, process); } /**