diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 6382147130..bdb4bd1631 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -451,4 +451,7 @@ public interface ActiveMQClientLogger extends BasicLogger { @Message(id = 214030, value = "Failed to bind {0}={1}", format = Message.Format.MESSAGE_FORMAT) void failedToBind(String p1, String p2, @Cause Throwable cause); + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 214031, value = "Failed to decode buffer, disconnect immediately.", format = Message.Format.MESSAGE_FORMAT) + void disconnectOnErrorDecoding(@Cause Throwable cause); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 8f6a5eac3c..38da780bc7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -1140,7 +1140,17 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C RemotingConnection theConn = connection; if (theConn != null && connectionID.equals(theConn.getID())) { - theConn.bufferReceived(connectionID, buffer); + try { + theConn.bufferReceived(connectionID, buffer); + } catch (final RuntimeException e) { + ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e); + threadPool.execute(new Runnable() { + @Override + public void run() { + theConn.fail(new ActiveMQException(e.getMessage())); + } + }); + } } else { logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java index 5a0d5141dd..fea3bf45f8 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java @@ -17,17 +17,24 @@ package org.apache.activemq.artemis.tests.extras.byteman; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +44,13 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase { private static AtomicBoolean corruptPacket = new AtomicBoolean(false); + @After + @Override + public void tearDown() throws Exception { + corruptPacket.set(false); + super.tearDown(); + } + @Test @BMRules( rules = {@BMRule( @@ -71,6 +85,56 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase { } } + @Test + @BMRules( + rules = {@BMRule( + name = "Corrupt Decoding", + targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder", + targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")}) + public void testClientDisconnect() throws Exception { + Queue q1 = createQueue("queue1"); + final Connection connection = nettyCf.createConnection(); + final CountDownLatch latch = new CountDownLatch(1); + + try { + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException e) { + latch.countDown(); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(q1); + TextMessage m = session.createTextMessage("hello"); + producer.send(m); + connection.start(); + + corruptPacket.set(true); + MessageConsumer consumer = session.createConsumer(q1); + consumer.receive(2000); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } finally { + corruptPacket.set(false); + + if (connection != null) { + connection.close(); + } + } + } + + public static void doThrow(ActiveMQBuffer buff) { + byte type = buff.getByte(buff.readerIndex()); + if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) { + corruptPacket.set(false); + throw new IllegalArgumentException("Invalid type: -84"); + } + } + public static void doThrow() { if (corruptPacket.get()) { corruptPacket.set(false);