ARTEMIS-1240 Disconnect at client side on decoding error
When a broken packet arrives at client side it causes decoding error. Currently artemis doesn't handle it properly. It should catch such errors and disconnect the underlying connection, logging a proper warning message
This commit is contained in:
parent
498a328635
commit
3d0896f87c
|
@ -451,4 +451,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
|
||||||
@Message(id = 214030, value = "Failed to bind {0}={1}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 214030, value = "Failed to bind {0}={1}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void failedToBind(String p1, String p2, @Cause Throwable cause);
|
void failedToBind(String p1, String p2, @Cause Throwable cause);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
|
@Message(id = 214031, value = "Failed to decode buffer, disconnect immediately.", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void disconnectOnErrorDecoding(@Cause Throwable cause);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1140,7 +1140,17 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
RemotingConnection theConn = connection;
|
RemotingConnection theConn = connection;
|
||||||
|
|
||||||
if (theConn != null && connectionID.equals(theConn.getID())) {
|
if (theConn != null && connectionID.equals(theConn.getID())) {
|
||||||
theConn.bufferReceived(connectionID, buffer);
|
try {
|
||||||
|
theConn.bufferReceived(connectionID, buffer);
|
||||||
|
} catch (final RuntimeException e) {
|
||||||
|
ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
|
||||||
|
threadPool.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
theConn.fail(new ActiveMQException(e.getMessage()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,17 +17,24 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.tests.extras.byteman;
|
package org.apache.activemq.artemis.tests.extras.byteman;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRule;
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRules;
|
import org.jboss.byteman.contrib.bmunit.BMRules;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ExceptionListener;
|
import javax.jms.ExceptionListener;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -37,6 +44,13 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
|
||||||
|
|
||||||
private static AtomicBoolean corruptPacket = new AtomicBoolean(false);
|
private static AtomicBoolean corruptPacket = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
corruptPacket.set(false);
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BMRules(
|
@BMRules(
|
||||||
rules = {@BMRule(
|
rules = {@BMRule(
|
||||||
|
@ -71,6 +85,56 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BMRules(
|
||||||
|
rules = {@BMRule(
|
||||||
|
name = "Corrupt Decoding",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
|
||||||
|
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
|
||||||
|
targetLocation = "ENTRY",
|
||||||
|
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
|
||||||
|
public void testClientDisconnect() throws Exception {
|
||||||
|
Queue q1 = createQueue("queue1");
|
||||||
|
final Connection connection = nettyCf.createConnection();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.setExceptionListener(new ExceptionListener() {
|
||||||
|
@Override
|
||||||
|
public void onException(JMSException e) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(q1);
|
||||||
|
TextMessage m = session.createTextMessage("hello");
|
||||||
|
producer.send(m);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
corruptPacket.set(true);
|
||||||
|
MessageConsumer consumer = session.createConsumer(q1);
|
||||||
|
consumer.receive(2000);
|
||||||
|
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
} finally {
|
||||||
|
corruptPacket.set(false);
|
||||||
|
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void doThrow(ActiveMQBuffer buff) {
|
||||||
|
byte type = buff.getByte(buff.readerIndex());
|
||||||
|
if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {
|
||||||
|
corruptPacket.set(false);
|
||||||
|
throw new IllegalArgumentException("Invalid type: -84");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void doThrow() {
|
public static void doThrow() {
|
||||||
if (corruptPacket.get()) {
|
if (corruptPacket.get()) {
|
||||||
corruptPacket.set(false);
|
corruptPacket.set(false);
|
||||||
|
|
Loading…
Reference in New Issue