ARTEMIS-3593 Defense against OME on parsing XID

Co-authored-by: Viktor Kolomeyko <Viktor.Kolomeyko@r3.com>
This commit is contained in:
Clebert Suconic 2021-12-02 17:02:56 -05:00 committed by clebertsuconic
parent 4196faf7ce
commit 72a4fff167
5 changed files with 293 additions and 5 deletions

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* Exception used for when reading invalid data from buffers.
*/
public class ActiveMQInvalidBufferException extends RuntimeException {
private static final long serialVersionUID = 7048111433271717514L;
public ActiveMQInvalidBufferException(String message) {
super(message);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidBufferException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@ -182,5 +183,27 @@ public class BufferHelper {
// perhaps we could optimize it and remove it, but that would break compatibility with older clients and journal
return DataConstants.SIZE_INT + sizeOfSimpleString(s);
}
public static byte[] safeReadBytes(final ActiveMQBuffer in) {
final int claimedSize = in.readInt();
if (claimedSize < 0) {
throw new ActiveMQInvalidBufferException("Payload size cannot be negative");
}
final int readableBytes = in.readableBytes();
// We have to be defensive here and not try to allocate byte buffer straight from information available in the
// stream. Or else, an adversary may handcraft the packet causing OOM situation for a running JVM.
if (claimedSize > readableBytes) {
throw new ActiveMQInvalidBufferException("Attempted to read: " + claimedSize +
" which exceeds overall readable buffer size of: " + readableBytes);
}
final byte[] byteBuffer = new byte[claimedSize];
in.readBytes(byteBuffer);
return byteBuffer;
}
}

View File

@ -21,8 +21,9 @@ import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
public class XidCodecSupport {
import static org.apache.activemq.artemis.utils.BufferHelper.safeReadBytes;
public class XidCodecSupport {
public static void encodeXid(final Xid xid, final ActiveMQBuffer out) {
out.writeInt(xid.getFormatId());
@ -34,10 +35,8 @@ public class XidCodecSupport {
public static Xid decodeXid(final ActiveMQBuffer in) {
int formatID = in.readInt();
byte[] bq = new byte[in.readInt()];
in.readBytes(bq);
byte[] gtxid = new byte[in.readInt()];
in.readBytes(gtxid);
byte[] bq = safeReadBytes(in);
byte[] gtxid = safeReadBytes(in);
return new XidImpl(bq, formatID, gtxid);
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.XidCodecSupport;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidBufferException;
import org.junit.Test;
import javax.transaction.xa.Xid;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.fail;
public class XidCodecSupportTest {
private static final Xid VALID_XID =
new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
@Test
public void testEncodeDecode() {
final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0);
XidCodecSupport.encodeXid(VALID_XID, buffer);
assertThat(buffer.readableBytes(), equalTo(51)); // formatId(4) + branchQualLength(4) + branchQual(3) +
// globalTxIdLength(4) + globalTx(36)
final Xid readXid = XidCodecSupport.decodeXid(buffer);
assertThat(readXid, equalTo(VALID_XID));
}
@Test
public void testNegativeLength() {
final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0);
XidCodecSupport.encodeXid(VALID_XID, buffer);
// Alter branchQualifierLength to be negative
buffer.setByte(4, (byte) 0xFF);
try {
XidCodecSupport.decodeXid(buffer);
fail("Should have thrown");
} catch (ActiveMQInvalidBufferException ex) {
return;
}
fail("should have thrown exception");
}
@Test(expected = ActiveMQInvalidBufferException.class)
public void testOverflowLength() {
final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0);
XidCodecSupport.encodeXid(VALID_XID, buffer);
// Alter globalTxIdLength to be too big
buffer.setByte(11, (byte) 0x0C);
XidCodecSupport.decodeXid(buffer);
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.XidCodecSupport;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class InvalidCoreClientTest extends ActiveMQTestBase {
private static final Logger log = Logger.getLogger(InvalidCoreClientTest.class);
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
private final SimpleString atestq = new SimpleString("BasicXaTestq");
private ActiveMQServer messagingService;
private ClientSession clientSession;
private ClientSessionFactory sessionFactory;
private Configuration configuration;
private ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
configuration = createDefaultNettyConfig();
messagingService = createServer(true, configuration, -1, -1, addressSettings);
// start the server
messagingService.start();
locator = createInVMNonHALocator();
sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
clientSession.createQueue(new QueueConfiguration(atestq));
}
@Test
public void testInvalidBufferXIDInvalidSize() throws Exception {
internalTestInvalidXID(false);
}
@Test
public void testInvalidBufferXIDNegative() throws Exception {
internalTestInvalidXID(true);
}
private void internalTestInvalidXID(boolean useNegative) throws Exception {
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
clientProducer.send(m1);
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage message = clientConsumer.receive(5000);
message.acknowledge();
clientSession.end(xid, XAResource.TMSUCCESS);
Channel channel = ((ActiveMQSessionContext) (((ClientSessionImpl) clientSession).getSessionContext())).getSessionChannel();
AtomicInteger connFailure = new AtomicInteger(0);
clientSession.addFailureListener(new SessionFailureListener() {
@Override
public void beforeReconnect(ActiveMQException exception) {
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
connFailure.incrementAndGet();
}
});
SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid) {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
ActiveMQBuffer bufferTmp = ActiveMQBuffers.dynamicBuffer(255);
XidCodecSupport.encodeXid(xid, bufferTmp);
if (useNegative) {
bufferTmp.setByte(4, (byte) 0x0F);
} else {
bufferTmp.setByte(4, (byte) 0xFF);
}
byte[] bytes = new byte[bufferTmp.readableBytes()];
bufferTmp.readBytes(bytes);
buffer.writeBytes(bytes);
}
};
try {
channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
Assert.fail("Failure expected");
} catch (Exception failed) {
}
// the connection was supposed to fail on disconnect
Wait.assertEquals(1, connFailure::get);
}
}