From 72a4fff1673477d78a85c415d48a2c74afda81fa Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 2 Dec 2021 17:02:56 -0500 Subject: [PATCH] ARTEMIS-3593 Defense against OME on parsing XID Co-authored-by: Viktor Kolomeyko --- .../core/ActiveMQInvalidBufferException.java | 29 ++++ .../activemq/artemis/utils/BufferHelper.java | 23 +++ .../artemis/utils/XidCodecSupport.java | 9 +- .../artemis/util/XidCodecSupportTest.java | 76 +++++++++ .../client/InvalidCoreClientTest.java | 161 ++++++++++++++++++ 5 files changed, 293 insertions(+), 5 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidBufferException.java create mode 100644 artemis-core-client/src/test/java/org/apache/activemq/artemis/util/XidCodecSupportTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InvalidCoreClientTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidBufferException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidBufferException.java new file mode 100644 index 0000000000..fcedf21d33 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidBufferException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +/** + * Exception used for when reading invalid data from buffers. + */ +public class ActiveMQInvalidBufferException extends RuntimeException { + private static final long serialVersionUID = 7048111433271717514L; + + public ActiveMQInvalidBufferException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/BufferHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/BufferHelper.java index 3643d07ca6..ca92d60d58 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/BufferHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/BufferHelper.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.utils; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQInvalidBufferException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -182,5 +183,27 @@ public class BufferHelper { // perhaps we could optimize it and remove it, but that would break compatibility with older clients and journal return DataConstants.SIZE_INT + sizeOfSimpleString(s); } + + + public static byte[] safeReadBytes(final ActiveMQBuffer in) { + final int claimedSize = in.readInt(); + + if (claimedSize < 0) { + throw new ActiveMQInvalidBufferException("Payload size cannot be negative"); + } + + final int readableBytes = in.readableBytes(); + // We have to be defensive here and not try to allocate byte buffer straight from information available in the + // stream. Or else, an adversary may handcraft the packet causing OOM situation for a running JVM. + if (claimedSize > readableBytes) { + throw new ActiveMQInvalidBufferException("Attempted to read: " + claimedSize + + " which exceeds overall readable buffer size of: " + readableBytes); + } + final byte[] byteBuffer = new byte[claimedSize]; + in.readBytes(byteBuffer); + return byteBuffer; + } + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XidCodecSupport.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XidCodecSupport.java index f746483fa0..304499e16c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XidCodecSupport.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XidCodecSupport.java @@ -21,8 +21,9 @@ import javax.transaction.xa.Xid; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; -public class XidCodecSupport { +import static org.apache.activemq.artemis.utils.BufferHelper.safeReadBytes; +public class XidCodecSupport { public static void encodeXid(final Xid xid, final ActiveMQBuffer out) { out.writeInt(xid.getFormatId()); @@ -34,10 +35,8 @@ public class XidCodecSupport { public static Xid decodeXid(final ActiveMQBuffer in) { int formatID = in.readInt(); - byte[] bq = new byte[in.readInt()]; - in.readBytes(bq); - byte[] gtxid = new byte[in.readInt()]; - in.readBytes(gtxid); + byte[] bq = safeReadBytes(in); + byte[] gtxid = safeReadBytes(in); return new XidImpl(bq, formatID, gtxid); } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/XidCodecSupportTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/XidCodecSupportTest.java new file mode 100644 index 0000000000..4edf65ef33 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/util/XidCodecSupportTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.util; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.XidCodecSupport; +import org.apache.activemq.artemis.api.core.ActiveMQInvalidBufferException; +import org.junit.Test; + +import javax.transaction.xa.Xid; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.fail; + +public class XidCodecSupportTest { + + private static final Xid VALID_XID = + new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); + + @Test + public void testEncodeDecode() { + final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0); + XidCodecSupport.encodeXid(VALID_XID, buffer); + + assertThat(buffer.readableBytes(), equalTo(51)); // formatId(4) + branchQualLength(4) + branchQual(3) + + // globalTxIdLength(4) + globalTx(36) + + final Xid readXid = XidCodecSupport.decodeXid(buffer); + assertThat(readXid, equalTo(VALID_XID)); + } + + @Test + public void testNegativeLength() { + final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0); + XidCodecSupport.encodeXid(VALID_XID, buffer); + // Alter branchQualifierLength to be negative + buffer.setByte(4, (byte) 0xFF); + try { + XidCodecSupport.decodeXid(buffer); + fail("Should have thrown"); + } catch (ActiveMQInvalidBufferException ex) { + return; + } + + fail("should have thrown exception"); + } + + @Test(expected = ActiveMQInvalidBufferException.class) + public void testOverflowLength() { + final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0); + XidCodecSupport.encodeXid(VALID_XID, buffer); + // Alter globalTxIdLength to be too big + buffer.setByte(11, (byte) 0x0C); + + XidCodecSupport.decodeXid(buffer); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InvalidCoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InvalidCoreClientTest.java new file mode 100644 index 0000000000..5361d9d664 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InvalidCoreClientTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.XidCodecSupport; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class InvalidCoreClientTest extends ActiveMQTestBase { + + private static final Logger log = Logger.getLogger(InvalidCoreClientTest.class); + + private final Map addressSettings = new HashMap<>(); + private final SimpleString atestq = new SimpleString("BasicXaTestq"); + private ActiveMQServer messagingService; + private ClientSession clientSession; + private ClientSessionFactory sessionFactory; + private Configuration configuration; + private ServerLocator locator; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + addressSettings.clear(); + + configuration = createDefaultNettyConfig(); + + messagingService = createServer(true, configuration, -1, -1, addressSettings); + + // start the server + messagingService.start(); + + locator = createInVMNonHALocator(); + sessionFactory = createSessionFactory(locator); + + clientSession = addClientSession(sessionFactory.createSession(true, false, false)); + + clientSession.createQueue(new QueueConfiguration(atestq)); + } + + @Test + public void testInvalidBufferXIDInvalidSize() throws Exception { + internalTestInvalidXID(false); + } + + @Test + public void testInvalidBufferXIDNegative() throws Exception { + internalTestInvalidXID(true); + } + + private void internalTestInvalidXID(boolean useNegative) throws Exception { + + ClientSession clientSession2 = sessionFactory.createSession(false, true, true); + ClientProducer clientProducer = clientSession2.createProducer(atestq); + ClientMessage m1 = createTextMessage(clientSession2, "m1"); + clientProducer.send(m1); + + Xid xid = newXID(); + clientSession.start(xid, XAResource.TMNOFLAGS); + clientSession.start(); + + ClientConsumer clientConsumer = clientSession.createConsumer(atestq); + + ClientMessage message = clientConsumer.receive(5000); + message.acknowledge(); + clientSession.end(xid, XAResource.TMSUCCESS); + Channel channel = ((ActiveMQSessionContext) (((ClientSessionImpl) clientSession).getSessionContext())).getSessionChannel(); + + AtomicInteger connFailure = new AtomicInteger(0); + clientSession.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + connFailure.incrementAndGet(); + } + }); + + SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid) { + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + + ActiveMQBuffer bufferTmp = ActiveMQBuffers.dynamicBuffer(255); + + XidCodecSupport.encodeXid(xid, bufferTmp); + if (useNegative) { + bufferTmp.setByte(4, (byte) 0x0F); + } else { + bufferTmp.setByte(4, (byte) 0xFF); + } + byte[] bytes = new byte[bufferTmp.readableBytes()]; + bufferTmp.readBytes(bytes); + buffer.writeBytes(bytes); + } + }; + + try { + channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + Assert.fail("Failure expected"); + } catch (Exception failed) { + } + + // the connection was supposed to fail on disconnect + Wait.assertEquals(1, connFailure::get); + + } +}