From 18cfdb7049ca90e5c5a08611413f6ba5d60152ff Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 30 Sep 2022 13:36:14 -0400 Subject: [PATCH] ARTEMIS-4024 Avoid excessive NativeMemory allocation when sending OpenWire Multi mega sized messages in openwire --- .../protocol/openwire/OpenWireConnection.java | 28 +- .../openwire/OpenWireFrameParser.java | 99 +++++++ .../openwire/OpenWireProtocolManager.java | 22 +- .../impl/journal/LargeServerMessageImpl.java | 35 ++- .../openwire/OpenWireLargeMessageTest.java | 88 +++--- tests/soak-tests/pom.xml | 21 ++ .../servers/openwire-leaktest/broker.xml | 253 +++++++++++++++++ .../artemis/tests/soak/owleak/OWLeakTest.java | 264 ++++++++++++++++++ .../soak/paging/FlowControlPagingTest.java | 2 +- .../soak/paging/HorizontalPagingTest.java | 2 +- .../soak/paging/SubscriptionPagingTest.java | 2 +- .../src/test/scripts/parameters-paging.sh | 13 +- .../soak-tests/src/test/scripts/parameters.sh | 99 +++++++ 13 files changed, 877 insertions(+), 51 deletions(-) create mode 100644 artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java create mode 100644 tests/soak-tests/src/main/resources/servers/openwire-leaktest/broker.xml create mode 100644 tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java create mode 100755 tests/soak-tests/src/test/scripts/parameters.sh diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3c639fbc13..3e67ef417f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -546,9 +546,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { final ByteSequence bytes = outWireFormat.marshal(command); final int bufferSize = bytes.length; - final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize); - buffer.writeBytes(bytes.data, bytes.offset, bufferSize); - transportConnection.write(buffer, false, false); + final int maxChunkSize = protocolManager.getOpenwireMaxPacketChunkSize(); + + if (maxChunkSize > 0 && bufferSize > maxChunkSize) { + chunkSend(bytes, bufferSize, maxChunkSize); + } else { + final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize); + buffer.writeBytes(bytes.data, bytes.offset, bufferSize); + transportConnection.write(buffer, false, false); + } bufferSent(); } catch (IOException e) { throw e; @@ -558,6 +564,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } + private void chunkSend(final ByteSequence bytes, final int bufferSize, final int maxChunkSize) { + if (logger.isTraceEnabled()) { + logger.trace("Sending a big packet sized as {} with smaller packets of {}", bufferSize, maxChunkSize); + } + while (bytes.remaining() > 0) { + int chunkSize = Math.min(bytes.remaining(), maxChunkSize); + if (logger.isTraceEnabled()) { + logger.trace("Sending a partial packet of {} bytes, starting at {}", chunkSize, bytes.remaining()); + } + final ActiveMQBuffer chunk = transportConnection.createTransportBuffer(chunkSize); + chunk.writeBytes(bytes.data, bytes.offset, chunkSize); + transportConnection.write(chunk, true, false); + bytes.setOffset(bytes.getOffset() + chunkSize); + } + } + public void dispatchAsync(Command message) throws Exception { dispatchSync(message); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java new file mode 100644 index 0000000000..81c429918d --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.openwire; + +import java.lang.invoke.MethodHandles; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.activemq.artemis.utils.DataConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This MessageDecoder is based on LengthFieldBasedFrameDecoder. + * When OpenWire clients send a Large Message (large in the context of size only as openwire does not support message chunk streaming). + * In that context the server will transfer the huge frame to a Heap Buffer, instead of keeping a really large native buffer. + * + * There's a test showing this situation under ./soak-tests named OWLeakTest. The test will send 200MB messages. For every message sent we would have 200MB native buffers + * not leaving much space for the broker to handle its IO as most of the IO needs to be done with Native Memory. + * */ +public class OpenWireFrameParser extends ByteToMessageDecoder { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + final int openwireMaxPacketChunkSize; + + public OpenWireFrameParser(int openwireMaxPacketChunkSize) { + this.openwireMaxPacketChunkSize = openwireMaxPacketChunkSize; + } + + ByteBuf outBuffer; + int bufferSize = -1; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (ctx.isRemoved()) { + return; + } + + if (bufferSize == -1) { + if (in.readableBytes() < DataConstants.SIZE_INT) { + return; + } + + bufferSize = in.getInt(in.readerIndex()) + DataConstants.SIZE_INT; + + if (openwireMaxPacketChunkSize > 0 && bufferSize > openwireMaxPacketChunkSize) { + if (logger.isTraceEnabled()) { + logger.trace("Creating a heapBuffer sized as {} as it is beyond {} chunk limit", bufferSize, openwireMaxPacketChunkSize); + } + // we will use a heap buffer for large frames. + // to avoid competing for resources with the broker on native messages. + // to save the broker in case users send huge messages in openwire. + outBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize); + } + } + + if (outBuffer != null) { + + int missingBytes = bufferSize - outBuffer.writerIndex(); + + int bytesToRead = Math.min(missingBytes, in.readableBytes()); + + outBuffer.writeBytes(in, bytesToRead); + + if (outBuffer.writerIndex() == bufferSize) { + out.add(outBuffer); + outBuffer = null; + bufferSize = -1; + } + } else { + if (in.readableBytes() >= bufferSize) { + out.add(in.retainedSlice(in.readerIndex(), bufferSize)); + in.skipBytes(bufferSize); + outBuffer = null; + bufferSize = -1; + } + } + } + +} + diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index f8215c8532..f1fd7fb6e1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; @@ -56,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.CompositeAddress; -import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; @@ -105,6 +103,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager prefixes = new HashMap<>(); @@ -345,8 +360,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager 0) { + wrappedIOBuffer.clear(); // equivalent to setting writingIndex=readerIndex=0; + int bytesToRead = Math.min(CHUNK_LM_SIZE, messageBodyBuffer.readableBytes()); + messageBodyBuffer.readBytes(wrappedIOBuffer, 0, bytesToRead); + wrappedIOBuffer.writerIndex(bytesToRead); + lsm.addBytes(wrappedIOBuffer); + } + } finally { + lsm.releaseResources(true, true); + ioBuffer.release(); + } if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) { lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java index 014bd7b260..80aecbbe7f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java @@ -18,20 +18,25 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import java.util.Map; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,23 +59,6 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { server.createQueue(new QueueConfiguration(lmDropAddress).setRoutingType(RoutingType.ANYCAST)); } - @Test - public void testSendLargeMessage() throws Exception { - try (Connection connection = factory.createConnection()) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(lmAddress.toString()); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - // Create 1MB Message - int size = 1024 * 1024; - byte[] bytes = new byte[size]; - BytesMessage message = session.createBytesMessage(); - message.writeBytes(bytes); - producer.send(message); - } - } - @Override protected void configureAddressSettings(Map addressSettingsMap) { addressSettingsMap.put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true)); @@ -78,17 +66,38 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { new AddressSettings() .setMaxSizeBytes(100 * 1024) .setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP) + .setMaxSizeMessages(2) .setMessageCounterHistoryDayLimit(10) .setRedeliveryDelay(0) .setMaxDeliveryAttempts(0)); } @Test - public void testSendReceiveLargeMessage() throws Exception { - // Create 1MB Message - int size = 1024 * 1024; + public void testSendReceiveLargeMessageRestart() throws Exception { + internalSendReceiveLargeMessage(factory, true); + internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), true); + } + + @Test + public void testSendReceiveLargeMessage() throws Exception { + internalSendReceiveLargeMessage(factory, false); + internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), false); + } + + private void internalSendReceiveLargeMessage(ConnectionFactory factory, boolean restart) throws Exception { + // Create 1MB Message + String largeString; + + { + String randomString = "This is a random String " + RandomUtil.randomString(); + StringBuffer largeBuffer = new StringBuffer(); + while (largeBuffer.length() < 1024 * 1024) { + largeBuffer.append(randomString); + } + + largeString = largeBuffer.toString(); + } - byte[] bytes = new byte[size]; try (Connection connection = factory.createConnection()) { connection.start(); @@ -98,15 +107,14 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - bytes[0] = 1; - - BytesMessage message = session.createBytesMessage(); - message.writeBytes(bytes); + TextMessage message = session.createTextMessage(largeString); producer.send(message); } - server.stop(); - server.start(); + if (restart) { + server.stop(); + server.start(); + } try (Connection connection = factory.createConnection()) { connection.start(); @@ -115,13 +123,8 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { MessageConsumer consumer = session.createConsumer(queue); - BytesMessage m = (BytesMessage) consumer.receive(); - assertNotNull(m); - - byte[] body = new byte[size]; - m.readBytes(body); - - assertArrayEquals(body, bytes); + TextMessage m = (TextMessage) consumer.receive(5000); + assertEquals(largeString, m.getText()); } } @@ -129,8 +132,8 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { public void testFastLargeMessageProducerDropOnPaging() throws Exception { AssertionLoggerHandler.startCapture(); try { - // Create 100K Message - int size = 100 * 1024; + // Create 200K Message + int size = 200 * 1024; final byte[] bytes = new byte[size]; @@ -173,4 +176,17 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { AssertionLoggerHandler.stopCapture(); } } + + + + @Override + protected void extraServerConfig(Configuration serverConfig) { + try { + // to validate the server would still work without MaxPackeSize configured + serverConfig.addAcceptorConfiguration("openwire", "tcp://0.0.0.0:61618?OPENWIRE;openwireMaxPacketSize=10 * 1024"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml index caf043ffd9..cd7851e28c 100644 --- a/tests/soak-tests/pom.xml +++ b/tests/soak-tests/pom.xml @@ -183,6 +183,27 @@ + + test-compile + create-openwire-leaktest + + create + + + amq + admin + admin + true + false + ${basedir}/target/openwire-leaktest + ${basedir}/target/classes/servers/openwire-leaktest + + --java-memory + 3G + + + + diff --git a/tests/soak-tests/src/main/resources/servers/openwire-leaktest/broker.xml b/tests/soak-tests/src/main/resources/servers/openwire-leaktest/broker.xml new file mode 100644 index 0000000000..05f9cef629 --- /dev/null +++ b/tests/soak-tests/src/main/resources/servers/openwire-leaktest/broker.xml @@ -0,0 +1,253 @@ + + + + + + + + 0.0.0.0 + + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + + + + + + true + + 2 + + 10 + + 4096 + + 10M + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + + + + + -1 + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false;openwireMaxPacketSize=102400 + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + + + -1 + + + 1000 + + + 10M + + + -1 + + + 20M + + 10 + PAGE + true + true + false + false + + + + +
+ + + +
+
+ + + +
+
+ + + + +
+
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java new file mode 100644 index 0000000000..79227f545a --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.owleak; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatoryProperty; +import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; + +/** + * Refer to ./scripts/parameters.sh for suggested parameters + * + * Even though this test is not testing Paging, it will use Page just to generate enough load to the server to compete for resources in Native Buffers. + * + */ +@RunWith(Parameterized.class) +public class OWLeakTest extends SoakTestBase { + + private static final int OK = 33; // arbitrary code. if the spawn returns this the test went fine + + public static final String SERVER_NAME_0 = "openwire-leaktest"; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String TEST_NAME = "OW_LEAK"; + private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true")); + private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE"); + private static final int TEST_TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMETOUT_MINUTES", 10); + private final String protocol; + private final int NUMBER_OF_MESSAGES; + private final int PRODUCERS; + private final int MESSAGE_SIZE; + Process serverProcess; + + public OWLeakTest(String protocol) { + this.protocol = protocol; + NUMBER_OF_MESSAGES = intMandatoryProperty(TEST_NAME, protocol + "_NUMBER_OF_MESSAGES"); + PRODUCERS = intMandatoryProperty(TEST_NAME, protocol + "_PRODUCERS"); + MESSAGE_SIZE = intMandatoryProperty(TEST_NAME, protocol + "_MESSAGE_SIZE"); + } + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection parameters() { + String[] protocols = PROTOCOL_LIST.split(","); + + ArrayList parameters = new ArrayList<>(); + for (String str : protocols) { + logger.debug("Adding {} to the list for the test", str); + parameters.add(new Object[]{str}); + } + + return parameters; + } + + @Before + public void before() throws Exception { + Assume.assumeTrue(TEST_ENABLED); + cleanupData(SERVER_NAME_0); + + serverProcess = startServer(SERVER_NAME_0, 0, 10_000); + } + + + private static String createLMBody(int messageSize, int producer, int sequence) { + StringBuffer buffer = new StringBuffer(); + String baseString = "A Large body from producer " + producer + ", sequence " + sequence; + + while (buffer.length() < messageSize) { + buffer.append(baseString); + } + return buffer.toString(); + } + + + public static void main(String[] arg) { + int PRODUCERS = Integer.parseInt(arg[0]); + int NUMBER_OF_MESSAGES = Integer.parseInt(arg[1]); + int MESSAGE_SIZE = Integer.parseInt(arg[2]); + String protocol = arg[3]; + ExecutorService service = Executors.newFixedThreadPool(PRODUCERS + 1 + 1); + + String QUEUE_NAME = "some_queue"; + + Semaphore semaphore = new Semaphore(PRODUCERS + 1); + + CountDownLatch latch = new CountDownLatch(PRODUCERS + 1 + 1); + + AtomicBoolean running = new AtomicBoolean(true); + + AtomicInteger errors = new AtomicInteger(0); + + try { + + for (int i = 0; i < PRODUCERS; i++) { + final int producerID = i; + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + service.execute(() -> { + try { + for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++) { + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + TextMessage message = session.createTextMessage(createLMBody(MESSAGE_SIZE, producerID, msg)); + message.setIntProperty("producerID", producerID); + message.setIntProperty("sequence", msg); + semaphore.acquire(); + producer.send(message); + logger.debug("Thread {} Sent message with size {} with the total number of {} messages of {}", producerID, MESSAGE_SIZE, msg, NUMBER_OF_MESSAGES); + producer.close(); + session.close(); + connection.close(); + } + } catch (Exception e) { + errors.incrementAndGet(); + e.printStackTrace(); + logger.warn(e.getMessage(), e); + } finally { + latch.countDown(); + } + }); + } + + + service.execute(() -> { + + int[] producerSequence = new int[PRODUCERS]; + + try { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + connection.start(); + + for (int i = 0; i < NUMBER_OF_MESSAGES * PRODUCERS; i++) { + TextMessage message = (TextMessage) consumer.receive(60_000); + Assert.assertNotNull(message); + int producerID = message.getIntProperty("producerID"); + int sequence = message.getIntProperty("sequence"); + logger.debug("Received message {} from producer {}", sequence, producerID); + Assert.assertEquals(producerSequence[producerID], sequence); + producerSequence[producerID]++; + Assert.assertEquals(createLMBody(MESSAGE_SIZE, producerID, sequence), message.getText()); + semaphore.release(); + } + + } catch (Throwable e) { + errors.incrementAndGet(); + logger.warn(e.getMessage(), e); + } finally { + running.set(false); + latch.countDown(); + } + }); + + service.execute(() -> { + // this is just creating enough loading somewhere else to compete for resources + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try { + Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue("fastQueue")); + MessageConsumer consumer = session.createConsumer(session.createQueue("fastQueue")); + connection.start(); + long msg = 0; + char[] msgStr = new char[1024]; + String buffer = new String(msgStr); + Arrays.fill(msgStr, 'a'); + while (running.get()) { + TextMessage message = session.createTextMessage(buffer); + producer.send(message); + if (++msg % 10000L == 0L) { + logger.debug("Sent and receive {} fast messages", msg); + } + + if (msg > 5000L) { + message = (TextMessage) consumer.receive(10000); + Assert.assertNotNull(message); + } + + if (msg % 100L == 0L) { + session.commit(); + } + } + session.commit(); + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + errors.incrementAndGet(); + e.printStackTrace(); + logger.warn(e.getMessage(), e); + } finally { + latch.countDown(); + running.set(false); + } + }); + + + Assert.assertTrue(latch.await(TEST_TIMEOUT_MINUTES, TimeUnit.MINUTES)); + + Assert.assertEquals(0, errors.get()); + + System.exit(OK); + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + } + + @Test + public void testValidateLeaks() throws Exception { + // I am using a spawn for the test client, as this test will need a big VM for the client. + // so I need control over the memory size for the VM. + Process process = SpawnedVMSupport.spawnVM(OWLeakTest.class.getName(), new String[]{"-Xmx3G"}, "" + PRODUCERS, "" + NUMBER_OF_MESSAGES, "" + MESSAGE_SIZE, protocol); + logger.debug("Process PID::{}", process.pid()); + Assert.assertTrue(process.waitFor(TEST_TIMEOUT_MINUTES, TimeUnit.MINUTES)); + Assert.assertEquals(OK, process.exitValue()); + + } + +} diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/FlowControlPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/FlowControlPagingTest.java index 4370ecd93f..821ad04d5b 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/FlowControlPagingTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/FlowControlPagingTest.java @@ -48,7 +48,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; /** - * Refer to ./scripts/parameters-paging.sh for suggested parameters + * Refer to ./scripts/parameters.sh for suggested parameters * #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging * export TEST_FLOW_ZIP_LOCATION=a folder */ @RunWith(Parameterized.class) diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java index 6234d59e8f..457deff0e0 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java @@ -49,7 +49,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; /** - * Refer to ./scripts/parameters-paging.sh for suggested parameters + * Refer to ./scripts/parameters.sh for suggested parameters * #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging * export TEST_HORIZONTAL_ZIP_LOCATION=a folder * */ diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/SubscriptionPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/SubscriptionPagingTest.java index ee83111076..297a229938 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/SubscriptionPagingTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/SubscriptionPagingTest.java @@ -51,7 +51,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; /** - * Refer to ./scripts/parameters-paging.sh for suggested parameters + * Refer to ./scripts/parameters.sh for suggested parameters * #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging * export TEST_FLOW_ZIP_LOCATION=a folder */ @RunWith(Parameterized.class) diff --git a/tests/soak-tests/src/test/scripts/parameters-paging.sh b/tests/soak-tests/src/test/scripts/parameters-paging.sh index bc58eeb580..faaf8a7565 100755 --- a/tests/soak-tests/src/test/scripts/parameters-paging.sh +++ b/tests/soak-tests/src/test/scripts/parameters-paging.sh @@ -22,6 +22,8 @@ # It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder #export TEST_ZIP_LOCATION=~/zipTest/ +echo "parameters-paging has been deprecated, please use parameters.sh" + #HorizontalPagingTest export TEST_HORIZONTAL_TEST_ENABLED=true @@ -87,4 +89,13 @@ export TEST_SUBSCRIPTION_CORE_MESSAGES=10000 export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000 export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0 export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000 -export TEST_SUBSCRIPTION_SLEEP_SLOW=1000 \ No newline at end of file +export TEST_SUBSCRIPTION_SLEEP_SLOW=1000 + + +#OWLeakTest +export TEST_OW_LEAK_TEST_ENABLED=true +export TEST_OW_LEAK_PROTOCOL_LIST=OPENWIRE +export TEST_OW_LEAK_OPENWIRE_NUMBER_OF_MESSAGES=15 +export TEST_OW_LEAK_OPENWIRE_PRODUCERS=1 +export TEST_OW_LEAK_OPENWIRE_MESSAGE_SIZE=200000000 +export TEST_OW_LEAK_PRINT_INTERVAL=1 diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/parameters.sh new file mode 100755 index 0000000000..db95c3776b --- /dev/null +++ b/tests/soak-tests/src/test/scripts/parameters.sh @@ -0,0 +1,99 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# this script contains a suggest set of variables to run the soak tests. + +## Generic variable: +# Some tests will support saving the producer's state before consumption. If you set this variable these tests will hold a zip file and recover it approprieatedly. +#export TEST_ZIP_LOCATION=~/zipTest/ + +#HorizontalPagingTest + +export TEST_HORIZONTAL_TEST_ENABLED=true +export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000 +export TEST_HORIZONTAL_TIMEOUT_MINUTES=120 +export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP + +export TEST_HORIZONTAL_CORE_DESTINATIONS=200 +export TEST_HORIZONTAL_CORE_MESSAGES=1000 +export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_AMQP_DESTINATIONS=200 +export TEST_HORIZONTAL_AMQP_MESSAGES=1000 +export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=200 +export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000 +export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10 + +export TEST_FLOW_SERVER_START_TIMEOUT=300000 +export TEST_FLOW_TIMEOUT_MINUTES=120 + + +# FlowControlPagingTest +export TEST_FLOW_PROTOCOL_LIST=CORE,AMQP,OPENWIRE +export TEST_FLOW_PRINT_INTERVAL=100 + +export TEST_FLOW_OPENWIRE_MESSAGES=10000 +export TEST_FLOW_OPENWIRE_COMMIT_INTERVAL=1000 +export TEST_FLOW_OPENWIRE_RECEIVE_COMMIT_INTERVAL=10 +export TEST_FLOW_OPENWIRE_MESSAGE_SIZE=60000 + +export TEST_FLOW_CORE_MESSAGES=10000 +export TEST_FLOW_CORE_COMMIT_INTERVAL=1000 +export TEST_FLOW_CORE_RECEIVE_COMMIT_INTERVAL=10 +export TEST_FLOW_CORE_MESSAGE_SIZE=30000 + +export TEST_FLOW_AMQP_MESSAGES=10000 +export TEST_FLOW_AMQP_COMMIT_INTERVAL=1000 +export TEST_FLOW_AMQP_RECEIVE_COMMIT_INTERVAL=10 +export TEST_FLOW_AMQP_MESSAGE_SIZE=30000 + + +# SubscriptionPagingTest +export TEST_SUBSCRIPTION_PROTOCOL_LIST=CORE + +export TEST_SUBSCRIPTION_SERVER_START_TIMEOUT=300000 +export TEST_SUBSCRIPTION_TIMEOUT_MINUTES=120 +export TEST_SUBSCRIPTION_PRINT_INTERVAL=100 +export TEST_SUBSCRIPTION_SLOW_SUBSCRIPTIONS=5 + + +export TEST_SUBSCRIPTION_CORE_MESSAGES=10000 +export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000 +export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000 +export TEST_SUBSCRIPTION_SLEEP_SLOW=1000 + + +#OWLeakTest +export TEST_OW_LEAK_TEST_ENABLED=true +export TEST_OW_LEAK_PROTOCOL_LIST=OPENWIRE +export TEST_OW_LEAK_OPENWIRE_NUMBER_OF_MESSAGES=15 +export TEST_OW_LEAK_OPENWIRE_PRODUCERS=1 +export TEST_OW_LEAK_OPENWIRE_MESSAGE_SIZE=200000000 +export TEST_OW_LEAK_PRINT_INTERVAL=1