diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index cada061561..f4efcb2308 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -359,6 +359,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement super.bufferReceived(connectionID, buffer); } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorDecodingPacket(e); + throw new IllegalStateException(e); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 3e15f3e2fe..9a227bb78a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -626,7 +626,12 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif ConnectionEntry conn = connections.get(connectionID); if (conn != null) { - conn.connection.bufferReceived(connectionID, buffer); + try { + conn.connection.bufferReceived(connectionID, buffer); + } catch (RuntimeException e) { + ActiveMQServerLogger.LOGGER.disconnectCritical("Error decoding buffer", e); + conn.connection.fail(new ActiveMQException(e.getMessage())); + } } else { if (logger.isTraceEnabled()) { logger.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 46a07df8fc..b96a7a0756 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1287,6 +1287,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT) void connectorRefNotFound(String connectorRef, String clusterConnection); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222218, value = "Server disconnecting: {0}", format = Message.Format.MESSAGE_FORMAT) + void disconnectCritical(String reason, @Cause Exception e); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @@ -1570,6 +1574,4 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224075, value = "Cannot find pageTX id = {0}", format = Message.Format.MESSAGE_FORMAT) void journalCannotFindPageTX(Long id); - - } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java new file mode 100644 index 0000000000..5a0d5141dd --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.byteman; + +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@RunWith(BMUnitRunner.class) +public class DisconnectOnCriticalFailureTest extends JMSTestBase { + + private static AtomicBoolean corruptPacket = new AtomicBoolean(false); + + @Test + @BMRules( + rules = {@BMRule( + name = "Corrupt Decoding", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder", + targetMethod = "decode(byte)", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow();")}) + public void testSendDisconnect() throws Exception { + createQueue("queue1"); + final Connection producerConnection = nettyCf.createConnection(); + final CountDownLatch latch = new CountDownLatch(1); + + try { + producerConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException e) { + latch.countDown(); + } + }); + + corruptPacket.set(true); + producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } finally { + corruptPacket.set(false); + + if (producerConnection != null) { + producerConnection.close(); + } + } + } + + public static void doThrow() { + if (corruptPacket.get()) { + corruptPacket.set(false); + throw new IllegalArgumentException("Invalid type: -84"); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java index 196ea9795c..44ab399530 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java @@ -59,6 +59,7 @@ public class JMSTestBase extends ActiveMQTestBase { protected MBeanServer mbeanServer; protected ConnectionFactory cf; + protected ConnectionFactory nettyCf; protected Connection conn; private final Set contextSet = new HashSet<>(); private final Random random = new Random(); @@ -130,7 +131,7 @@ public class JMSTestBase extends ActiveMQTestBase { Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()). addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)). setTransactionTimeoutScanPeriod(100); - + config.getConnectorConfigurations().put("netty", new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence())); jmsServer = new JMSServerManagerImpl(server); namingContext = new InVMNamingContext(); @@ -197,21 +198,34 @@ public class JMSTestBase extends ActiveMQTestBase { List connectorConfigs = new ArrayList<>(); connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + List connectorConfigs1 = new ArrayList<>(); + connectorConfigs1.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + createCF(connectorConfigs, "/cf"); + createCF("NettyCF", connectorConfigs1, "/nettyCf"); cf = (ConnectionFactory) namingContext.lookup("/cf"); + nettyCf = (ConnectionFactory)namingContext.lookup("/nettyCf"); } + protected void createCF(final List connectorConfigs, + final String... jndiBindings) throws Exception { + createCF(name.getMethodName(), connectorConfigs, jndiBindings); + } + + /** - * @param connectorConfigs - * @param jndiBindings + * @param cfName the unique ConnectionFactory's name + * @param connectorConfigs initial static connectors' config + * @param jndiBindings JNDI binding names for the CF * @throws Exception */ - protected void createCF(final List connectorConfigs, + protected void createCF(final String cfName, + final List connectorConfigs, final String... jndiBindings) throws Exception { List connectorNames = registerConnectors(server, connectorConfigs); - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name.getMethodName()).setConnectorNames(connectorNames).setRetryInterval(1000).setReconnectAttempts(-1); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(cfName).setConnectorNames(connectorNames).setRetryInterval(1000).setReconnectAttempts(-1); testCaseCfExtraConfig(configuration); jmsServer.createConnectionFactory(false, configuration, jndiBindings); }