ARTEMIS-4024 Avoid excessive NativeMemory allocation when sending OpenWire Multi mega sized messages in openwire

This commit is contained in:
Clebert Suconic 2022-09-30 13:36:14 -04:00 committed by clebertsuconic
parent f465073744
commit 18cfdb7049
13 changed files with 877 additions and 51 deletions

View File

@ -546,9 +546,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
try {
final ByteSequence bytes = outWireFormat.marshal(command);
final int bufferSize = bytes.length;
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
final int maxChunkSize = protocolManager.getOpenwireMaxPacketChunkSize();
if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
chunkSend(bytes, bufferSize, maxChunkSize);
} else {
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
}
bufferSent();
} catch (IOException e) {
throw e;
@ -558,6 +564,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void chunkSend(final ByteSequence bytes, final int bufferSize, final int maxChunkSize) {
if (logger.isTraceEnabled()) {
logger.trace("Sending a big packet sized as {} with smaller packets of {}", bufferSize, maxChunkSize);
}
while (bytes.remaining() > 0) {
int chunkSize = Math.min(bytes.remaining(), maxChunkSize);
if (logger.isTraceEnabled()) {
logger.trace("Sending a partial packet of {} bytes, starting at {}", chunkSize, bytes.remaining());
}
final ActiveMQBuffer chunk = transportConnection.createTransportBuffer(chunkSize);
chunk.writeBytes(bytes.data, bytes.offset, chunkSize);
transportConnection.write(chunk, true, false);
bytes.setOffset(bytes.getOffset() + chunkSize);
}
}
public void dispatchAsync(Command message) throws Exception {
dispatchSync(message);
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import java.lang.invoke.MethodHandles;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.activemq.artemis.utils.DataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This MessageDecoder is based on LengthFieldBasedFrameDecoder.
* When OpenWire clients send a Large Message (large in the context of size only as openwire does not support message chunk streaming).
* In that context the server will transfer the huge frame to a Heap Buffer, instead of keeping a really large native buffer.
*
* There's a test showing this situation under ./soak-tests named OWLeakTest. The test will send 200MB messages. For every message sent we would have 200MB native buffers
* not leaving much space for the broker to handle its IO as most of the IO needs to be done with Native Memory.
* */
public class OpenWireFrameParser extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final int openwireMaxPacketChunkSize;
public OpenWireFrameParser(int openwireMaxPacketChunkSize) {
this.openwireMaxPacketChunkSize = openwireMaxPacketChunkSize;
}
ByteBuf outBuffer;
int bufferSize = -1;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (ctx.isRemoved()) {
return;
}
if (bufferSize == -1) {
if (in.readableBytes() < DataConstants.SIZE_INT) {
return;
}
bufferSize = in.getInt(in.readerIndex()) + DataConstants.SIZE_INT;
if (openwireMaxPacketChunkSize > 0 && bufferSize > openwireMaxPacketChunkSize) {
if (logger.isTraceEnabled()) {
logger.trace("Creating a heapBuffer sized as {} as it is beyond {} chunk limit", bufferSize, openwireMaxPacketChunkSize);
}
// we will use a heap buffer for large frames.
// to avoid competing for resources with the broker on native messages.
// to save the broker in case users send huge messages in openwire.
outBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);
}
}
if (outBuffer != null) {
int missingBytes = bufferSize - outBuffer.writerIndex();
int bytesToRead = Math.min(missingBytes, in.readableBytes());
outBuffer.writeBytes(in, bytesToRead);
if (outBuffer.writerIndex() == bufferSize) {
out.add(outBuffer);
outBuffer = null;
bufferSize = -1;
}
} else {
if (in.readableBytes() >= bufferSize) {
out.add(in.retainedSlice(in.readerIndex(), bufferSize));
in.skipBytes(bufferSize);
outBuffer = null;
bufferSize = -1;
}
}
}
}

View File

@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -56,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
@ -105,6 +103,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private int actorThresholdBytes = -1;
private BrokerId brokerId;
protected final ProducerId advisoryProducerId = new ProducerId();
@ -130,6 +129,10 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private boolean openwireUseDuplicateDetectionOnFailover = true;
// if positive, packets will sent in chunks avoiding a single allocation
// this is to prevent large messages allocating really huge packets
private int openwireMaxPacketChunkSize = 100 * 1024;
//http://activemq.apache.org/activemq-inactivitymonitor.html
private long maxInactivityDuration = 30 * 1000L;
private long maxInactivityDurationInitalDelay = 10 * 1000L;
@ -142,6 +145,18 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private int openWireDestinationCacheSize = 16;
/** if defined, LargeMessages will be sent in chunks to the network.
* Notice that the system will still load the entire file in memory before sending on the stream.
* This should avoid just a big buffer allocated. */
public int getOpenwireMaxPacketChunkSize() {
return openwireMaxPacketChunkSize;
}
public OpenWireProtocolManager setOpenwireMaxPacketChunkSize(int openwireMaxPacketChunkSize) {
this.openwireMaxPacketChunkSize = openwireMaxPacketChunkSize;
return this;
}
private final OpenWireFormat wireFormat;
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@ -345,8 +360,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
@Override
public void addChannelHandlers(ChannelPipeline pipeline) {
// each read will have a full packet with this
pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT));
pipeline.addLast("large-frame-dealer", new OpenWireFrameParser(openwireMaxPacketChunkSize));
}
@Override

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@ -41,6 +44,8 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
// with HotSpot 64-bit COOPS 8-byte align
private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET;
private static final int CHUNK_LM_SIZE = 100 * 1024;
@Override
public Message toMessage() {
return this;
@ -68,10 +73,32 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources(true, true);
ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = messageBodyBuffer.readableBytes();
// I'm creating a native buffer here
// because FileChannelImpl (which is used by NIOSequentialFile) would create a Ghost Native Buffer
// that we would have no control. that's usually stored in a ThreadLocal within the native layer.
// to avoid that buffer be kept in memory holding resources we will allocate our own buffer here from the NettyPool.
// ./soakTest/OWLeakTest was written to validate this scenario here.
ByteBuf ioBuffer = PooledByteBufAllocator.DEFAULT.ioBuffer(CHUNK_LM_SIZE, CHUNK_LM_SIZE);
ActiveMQBuffer wrappedIOBuffer = new ChannelBufferWrapper(ioBuffer);
try {
// We write in chunks to avoid allocating a full NativeBody sized as the message size
// which might lead the broker out of resources
while (messageBodyBuffer.readableBytes() > 0) {
wrappedIOBuffer.clear(); // equivalent to setting writingIndex=readerIndex=0;
int bytesToRead = Math.min(CHUNK_LM_SIZE, messageBodyBuffer.readableBytes());
messageBodyBuffer.readBytes(wrappedIOBuffer, 0, bytesToRead);
wrappedIOBuffer.writerIndex(bytesToRead);
lsm.addBytes(wrappedIOBuffer);
}
} finally {
lsm.releaseResources(true, true);
ioBuffer.release();
}
if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) {
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);

View File

@ -18,20 +18,25 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Map;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -54,23 +59,6 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
server.createQueue(new QueueConfiguration(lmDropAddress).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testSendLargeMessage() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(lmAddress.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Create 1MB Message
int size = 1024 * 1024;
byte[] bytes = new byte[size];
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
producer.send(message);
}
}
@Override
protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
addressSettingsMap.put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true));
@ -78,17 +66,38 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
new AddressSettings()
.setMaxSizeBytes(100 * 1024)
.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP)
.setMaxSizeMessages(2)
.setMessageCounterHistoryDayLimit(10)
.setRedeliveryDelay(0)
.setMaxDeliveryAttempts(0));
}
@Test
public void testSendReceiveLargeMessage() throws Exception {
// Create 1MB Message
int size = 1024 * 1024;
public void testSendReceiveLargeMessageRestart() throws Exception {
internalSendReceiveLargeMessage(factory, true);
internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), true);
}
@Test
public void testSendReceiveLargeMessage() throws Exception {
internalSendReceiveLargeMessage(factory, false);
internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), false);
}
private void internalSendReceiveLargeMessage(ConnectionFactory factory, boolean restart) throws Exception {
// Create 1MB Message
String largeString;
{
String randomString = "This is a random String " + RandomUtil.randomString();
StringBuffer largeBuffer = new StringBuffer();
while (largeBuffer.length() < 1024 * 1024) {
largeBuffer.append(randomString);
}
largeString = largeBuffer.toString();
}
byte[] bytes = new byte[size];
try (Connection connection = factory.createConnection()) {
connection.start();
@ -98,15 +107,14 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
bytes[0] = 1;
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
TextMessage message = session.createTextMessage(largeString);
producer.send(message);
}
server.stop();
server.start();
if (restart) {
server.stop();
server.start();
}
try (Connection connection = factory.createConnection()) {
connection.start();
@ -115,13 +123,8 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
MessageConsumer consumer = session.createConsumer(queue);
BytesMessage m = (BytesMessage) consumer.receive();
assertNotNull(m);
byte[] body = new byte[size];
m.readBytes(body);
assertArrayEquals(body, bytes);
TextMessage m = (TextMessage) consumer.receive(5000);
assertEquals(largeString, m.getText());
}
}
@ -129,8 +132,8 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
public void testFastLargeMessageProducerDropOnPaging() throws Exception {
AssertionLoggerHandler.startCapture();
try {
// Create 100K Message
int size = 100 * 1024;
// Create 200K Message
int size = 200 * 1024;
final byte[] bytes = new byte[size];
@ -173,4 +176,17 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
AssertionLoggerHandler.stopCapture();
}
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
try {
// to validate the server would still work without MaxPackeSize configured
serverConfig.addAcceptorConfiguration("openwire", "tcp://0.0.0.0:61618?OPENWIRE;openwireMaxPacketSize=10 * 1024");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -183,6 +183,27 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-openwire-leaktest</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>admin</user>
<password>admin</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/openwire-leaktest</instance>
<configuration>${basedir}/target/classes/servers/openwire-leaktest</configuration>
<args>
<arg>--java-memory</arg>
<arg>3G</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,253 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<!-- if you want to retain your journal uncomment this following configuration.
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
it is recommended to use a separate storage unit from the journal for performance considerations.
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size> -->
<!-- the maximum number of messages accepted before entering full address mode.
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
<global-max-messages>-1</global-max-messages>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false;openwireMaxPacketSize=102400</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>-1</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>1000</max-size-messages>
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
Lower this setting if you have too many queues in memory. -->
<page-size-bytes>10M</page-size-bytes>
<!-- limit how many messages are read from paging into the Queue. -->
<max-read-page-messages>-1</max-read-page-messages>
<!-- limit how much memory is read from paging into the Queue. -->
<max-read-page-bytes>20M</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.owleak;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatoryProperty;
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters.sh for suggested parameters
*
* Even though this test is not testing Paging, it will use Page just to generate enough load to the server to compete for resources in Native Buffers.
*
*/
@RunWith(Parameterized.class)
public class OWLeakTest extends SoakTestBase {
private static final int OK = 33; // arbitrary code. if the spawn returns this the test went fine
public static final String SERVER_NAME_0 = "openwire-leaktest";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String TEST_NAME = "OW_LEAK";
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE");
private static final int TEST_TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMETOUT_MINUTES", 10);
private final String protocol;
private final int NUMBER_OF_MESSAGES;
private final int PRODUCERS;
private final int MESSAGE_SIZE;
Process serverProcess;
public OWLeakTest(String protocol) {
this.protocol = protocol;
NUMBER_OF_MESSAGES = intMandatoryProperty(TEST_NAME, protocol + "_NUMBER_OF_MESSAGES");
PRODUCERS = intMandatoryProperty(TEST_NAME, protocol + "_PRODUCERS");
MESSAGE_SIZE = intMandatoryProperty(TEST_NAME, protocol + "_MESSAGE_SIZE");
}
@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> parameters() {
String[] protocols = PROTOCOL_LIST.split(",");
ArrayList<Object[]> parameters = new ArrayList<>();
for (String str : protocols) {
logger.debug("Adding {} to the list for the test", str);
parameters.add(new Object[]{str});
}
return parameters;
}
@Before
public void before() throws Exception {
Assume.assumeTrue(TEST_ENABLED);
cleanupData(SERVER_NAME_0);
serverProcess = startServer(SERVER_NAME_0, 0, 10_000);
}
private static String createLMBody(int messageSize, int producer, int sequence) {
StringBuffer buffer = new StringBuffer();
String baseString = "A Large body from producer " + producer + ", sequence " + sequence;
while (buffer.length() < messageSize) {
buffer.append(baseString);
}
return buffer.toString();
}
public static void main(String[] arg) {
int PRODUCERS = Integer.parseInt(arg[0]);
int NUMBER_OF_MESSAGES = Integer.parseInt(arg[1]);
int MESSAGE_SIZE = Integer.parseInt(arg[2]);
String protocol = arg[3];
ExecutorService service = Executors.newFixedThreadPool(PRODUCERS + 1 + 1);
String QUEUE_NAME = "some_queue";
Semaphore semaphore = new Semaphore(PRODUCERS + 1);
CountDownLatch latch = new CountDownLatch(PRODUCERS + 1 + 1);
AtomicBoolean running = new AtomicBoolean(true);
AtomicInteger errors = new AtomicInteger(0);
try {
for (int i = 0; i < PRODUCERS; i++) {
final int producerID = i;
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
service.execute(() -> {
try {
for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++) {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
TextMessage message = session.createTextMessage(createLMBody(MESSAGE_SIZE, producerID, msg));
message.setIntProperty("producerID", producerID);
message.setIntProperty("sequence", msg);
semaphore.acquire();
producer.send(message);
logger.debug("Thread {} Sent message with size {} with the total number of {} messages of {}", producerID, MESSAGE_SIZE, msg, NUMBER_OF_MESSAGES);
producer.close();
session.close();
connection.close();
}
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();
logger.warn(e.getMessage(), e);
} finally {
latch.countDown();
}
});
}
service.execute(() -> {
int[] producerSequence = new int[PRODUCERS];
try {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
connection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES * PRODUCERS; i++) {
TextMessage message = (TextMessage) consumer.receive(60_000);
Assert.assertNotNull(message);
int producerID = message.getIntProperty("producerID");
int sequence = message.getIntProperty("sequence");
logger.debug("Received message {} from producer {}", sequence, producerID);
Assert.assertEquals(producerSequence[producerID], sequence);
producerSequence[producerID]++;
Assert.assertEquals(createLMBody(MESSAGE_SIZE, producerID, sequence), message.getText());
semaphore.release();
}
} catch (Throwable e) {
errors.incrementAndGet();
logger.warn(e.getMessage(), e);
} finally {
running.set(false);
latch.countDown();
}
});
service.execute(() -> {
// this is just creating enough loading somewhere else to compete for resources
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try {
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("fastQueue"));
MessageConsumer consumer = session.createConsumer(session.createQueue("fastQueue"));
connection.start();
long msg = 0;
char[] msgStr = new char[1024];
String buffer = new String(msgStr);
Arrays.fill(msgStr, 'a');
while (running.get()) {
TextMessage message = session.createTextMessage(buffer);
producer.send(message);
if (++msg % 10000L == 0L) {
logger.debug("Sent and receive {} fast messages", msg);
}
if (msg > 5000L) {
message = (TextMessage) consumer.receive(10000);
Assert.assertNotNull(message);
}
if (msg % 100L == 0L) {
session.commit();
}
}
session.commit();
producer.close();
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();
logger.warn(e.getMessage(), e);
} finally {
latch.countDown();
running.set(false);
}
});
Assert.assertTrue(latch.await(TEST_TIMEOUT_MINUTES, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
System.exit(OK);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
@Test
public void testValidateLeaks() throws Exception {
// I am using a spawn for the test client, as this test will need a big VM for the client.
// so I need control over the memory size for the VM.
Process process = SpawnedVMSupport.spawnVM(OWLeakTest.class.getName(), new String[]{"-Xmx3G"}, "" + PRODUCERS, "" + NUMBER_OF_MESSAGES, "" + MESSAGE_SIZE, protocol);
logger.debug("Process PID::{}", process.pid());
Assert.assertTrue(process.waitFor(TEST_TIMEOUT_MINUTES, TimeUnit.MINUTES));
Assert.assertEquals(OK, process.exitValue());
}
}

View File

@ -48,7 +48,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* Refer to ./scripts/parameters.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_FLOW_ZIP_LOCATION=a folder */
@RunWith(Parameterized.class)

View File

@ -49,7 +49,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* Refer to ./scripts/parameters.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_HORIZONTAL_ZIP_LOCATION=a folder
* */

View File

@ -51,7 +51,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.intMandatory
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* Refer to ./scripts/parameters.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_FLOW_ZIP_LOCATION=a folder */
@RunWith(Parameterized.class)

View File

@ -22,6 +22,8 @@
# It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder
#export TEST_ZIP_LOCATION=~/zipTest/
echo "parameters-paging has been deprecated, please use parameters.sh"
#HorizontalPagingTest
export TEST_HORIZONTAL_TEST_ENABLED=true
@ -87,4 +89,13 @@ export TEST_SUBSCRIPTION_CORE_MESSAGES=10000
export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000
export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0
export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000
export TEST_SUBSCRIPTION_SLEEP_SLOW=1000
export TEST_SUBSCRIPTION_SLEEP_SLOW=1000
#OWLeakTest
export TEST_OW_LEAK_TEST_ENABLED=true
export TEST_OW_LEAK_PROTOCOL_LIST=OPENWIRE
export TEST_OW_LEAK_OPENWIRE_NUMBER_OF_MESSAGES=15
export TEST_OW_LEAK_OPENWIRE_PRODUCERS=1
export TEST_OW_LEAK_OPENWIRE_MESSAGE_SIZE=200000000
export TEST_OW_LEAK_PRINT_INTERVAL=1

View File

@ -0,0 +1,99 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# this script contains a suggest set of variables to run the soak tests.
## Generic variable:
# Some tests will support saving the producer's state before consumption. If you set this variable these tests will hold a zip file and recover it approprieatedly.
#export TEST_ZIP_LOCATION=~/zipTest/
#HorizontalPagingTest
export TEST_HORIZONTAL_TEST_ENABLED=true
export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000
export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP
export TEST_HORIZONTAL_CORE_DESTINATIONS=200
export TEST_HORIZONTAL_CORE_MESSAGES=1000
export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10
export TEST_HORIZONTAL_AMQP_DESTINATIONS=200
export TEST_HORIZONTAL_AMQP_MESSAGES=1000
export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10
export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=200
export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000
export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10
export TEST_FLOW_SERVER_START_TIMEOUT=300000
export TEST_FLOW_TIMEOUT_MINUTES=120
# FlowControlPagingTest
export TEST_FLOW_PROTOCOL_LIST=CORE,AMQP,OPENWIRE
export TEST_FLOW_PRINT_INTERVAL=100
export TEST_FLOW_OPENWIRE_MESSAGES=10000
export TEST_FLOW_OPENWIRE_COMMIT_INTERVAL=1000
export TEST_FLOW_OPENWIRE_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_OPENWIRE_MESSAGE_SIZE=60000
export TEST_FLOW_CORE_MESSAGES=10000
export TEST_FLOW_CORE_COMMIT_INTERVAL=1000
export TEST_FLOW_CORE_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_CORE_MESSAGE_SIZE=30000
export TEST_FLOW_AMQP_MESSAGES=10000
export TEST_FLOW_AMQP_COMMIT_INTERVAL=1000
export TEST_FLOW_AMQP_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_AMQP_MESSAGE_SIZE=30000
# SubscriptionPagingTest
export TEST_SUBSCRIPTION_PROTOCOL_LIST=CORE
export TEST_SUBSCRIPTION_SERVER_START_TIMEOUT=300000
export TEST_SUBSCRIPTION_TIMEOUT_MINUTES=120
export TEST_SUBSCRIPTION_PRINT_INTERVAL=100
export TEST_SUBSCRIPTION_SLOW_SUBSCRIPTIONS=5
export TEST_SUBSCRIPTION_CORE_MESSAGES=10000
export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000
export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0
export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000
export TEST_SUBSCRIPTION_SLEEP_SLOW=1000
#OWLeakTest
export TEST_OW_LEAK_TEST_ENABLED=true
export TEST_OW_LEAK_PROTOCOL_LIST=OPENWIRE
export TEST_OW_LEAK_OPENWIRE_NUMBER_OF_MESSAGES=15
export TEST_OW_LEAK_OPENWIRE_PRODUCERS=1
export TEST_OW_LEAK_OPENWIRE_MESSAGE_SIZE=200000000
export TEST_OW_LEAK_PRINT_INTERVAL=1