mirror of https://github.com/apache/activemq.git
Add some additional checks and handlers for frames with an invalid size prefix and ensure that the connection state is torn down broker side.
This commit is contained in:
parent
9445e93ae4
commit
a5c2f3f423
|
@ -104,6 +104,8 @@ public class AmqpWireFormat implements WireFormat {
|
|||
int size = dataIn.readInt();
|
||||
if (size > maxFrameSize) {
|
||||
throw new AmqpProtocolException("Frame size exceeded max frame length.");
|
||||
} else if (size <= 0) {
|
||||
throw new AmqpProtocolException("Frame size value was invalid: " + size);
|
||||
}
|
||||
Buffer frame = new Buffer(size);
|
||||
frame.bigEndianEditor().writeInt(size);
|
||||
|
|
|
@ -497,6 +497,15 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
|||
public void onAMQPException(IOException error) {
|
||||
closedSocket = true;
|
||||
if (!closing) {
|
||||
try {
|
||||
closing = true;
|
||||
// Attempt to inform the other end that we are going to close
|
||||
// so that the client doesn't wait around forever.
|
||||
protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
|
||||
protonConnection.close();
|
||||
pumpProtonToSocket();
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
amqpTransport.sendToActiveMQ(error);
|
||||
} else {
|
||||
try {
|
||||
|
|
|
@ -33,8 +33,13 @@ import org.junit.Test;
|
|||
*/
|
||||
public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
|
||||
|
||||
@Override
|
||||
protected String getAdditionalConfig() {
|
||||
return "?transport.wireFormat.maxFrameSize=65535";
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCanConnect() throws Exception {
|
||||
public void testHandlingCorruptedFramePayload() throws Exception {
|
||||
Random random = new Random();
|
||||
random.setSeed(System.nanoTime());
|
||||
|
||||
|
@ -65,5 +70,88 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
|
|||
}));
|
||||
|
||||
connection.close();
|
||||
|
||||
// Should be able to recycle the client ID now.
|
||||
connection = client.createConnection();
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testHandleFrameWithNegativeSize() throws Exception {
|
||||
Random random = new Random();
|
||||
random.setSeed(System.nanoTime());
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
||||
// Send frame with valid size prefix, but corrupted payload.
|
||||
byte[] corruptedFrame = new byte[1024];
|
||||
random.nextBytes(corruptedFrame);
|
||||
corruptedFrame[0] = (byte) 0xFF;
|
||||
corruptedFrame[1] = 0x0;
|
||||
corruptedFrame[2] = 0x4;
|
||||
corruptedFrame[3] = 0x0;
|
||||
|
||||
connection.sendRawBytes(corruptedFrame);
|
||||
|
||||
assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
connection.close();
|
||||
|
||||
// Should be able to recycle the client ID now.
|
||||
connection = client.createConnection();
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testHandleFrameSizeExceedsMaxFrameSize() throws Exception {
|
||||
Random random = new Random();
|
||||
random.setSeed(System.nanoTime());
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
||||
// Send frame with valid size prefix, but corrupted payload.
|
||||
byte[] corruptedFrame = new byte[1024];
|
||||
random.nextBytes(corruptedFrame);
|
||||
corruptedFrame[0] = 0x0;
|
||||
corruptedFrame[1] = 0x7F;
|
||||
corruptedFrame[2] = 0x7F;
|
||||
corruptedFrame[3] = 0x7F;
|
||||
|
||||
connection.sendRawBytes(corruptedFrame);
|
||||
|
||||
assertTrue("Connection should have dropped.", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
connection.close();
|
||||
|
||||
// Should be able to recycle the client ID now.
|
||||
connection = client.createConnection();
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue