diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java new file mode 100644 index 0000000000..c7e51d70a8 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.ToIntFunction; + +import org.jboss.logging.Logger; + +public class ThresholdActor extends ProcessorBase { + + private static final Logger logger = Logger.getLogger(ThresholdActor.class); + + private static final AtomicIntegerFieldUpdater SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "size"); + private volatile int size = 0; + + private static final AtomicIntegerFieldUpdater SCHEDULED_FLUSH_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "scheduledFlush"); + private volatile int scheduledFlush = 0; + + private static final Object FLUSH = new Object(); + + private final int maxSize; + private final ToIntFunction sizeGetter; + private final ActorListener listener; + private final Runnable overThreshold; + private final Runnable clearThreshold; + + public ThresholdActor(Executor parent, ActorListener listener, int maxSize, ToIntFunction sizeGetter, Runnable overThreshold, Runnable clearThreshold) { + super(parent); + this.listener = listener; + this.maxSize = maxSize; + this.sizeGetter = sizeGetter; + this.overThreshold = overThreshold; + this.clearThreshold = clearThreshold; + } + + @Override + protected final void doTask(Object task) { + if (task == FLUSH) { + clearThreshold.run(); + // should set to 0 no matter the value. There's a single thread setting this value back to zero + SCHEDULED_FLUSH_UPDATER.set(this, 0); + return; + } + + final T theTask = (T)task; + + int estimateSize = sizeGetter.applyAsInt(theTask); + + try { + listener.onMessage(theTask); + } finally { + if (estimateSize > 0) { + SIZE_UPDATER.getAndAdd(this, -size); + } else { + logger.debug("element " + theTask + " returned an invalid size over the Actor during release"); + } + } + } + + public void act(T message) { + int sizeEstimate = sizeGetter.applyAsInt(message); + if (sizeEstimate > 0) { + int size = SIZE_UPDATER.addAndGet(this, sizeGetter.applyAsInt(message)); + if (size > maxSize) { + flush(); + } + } else { + logger.debug("element " + message + " returned an invalid size over the Actor"); + } + task(message); + } + + public void flush() { + if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) { + overThreshold.run(); + task(FLUSH); + } + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java new file mode 100644 index 0000000000..5c715ecc3f --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class ThresholdActorTest { + + Semaphore semaphore = new Semaphore(1); + AtomicInteger result = new AtomicInteger(0); + AtomicInteger lastProcessed = new AtomicInteger(0); + AtomicInteger errors = new AtomicInteger(0); + + @Test + public void limitedSize() throws Exception { + lastProcessed.set(0); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + AtomicInteger timesOpen = new AtomicInteger(0); + AtomicInteger timesClose = new AtomicInteger(0); + AtomicBoolean open = new AtomicBoolean(true); + try { + semaphore.acquire(); + ThresholdActor actor = new ThresholdActor<>(executorService, this::limitedProcess, 10, (s) -> 1, () -> { + timesClose.incrementAndGet(); + open.set(false); + }, () -> { + timesOpen.incrementAndGet(); + open.set(true); + }); + + for (int i = 0; i < 10; i++) { + actor.act(i); + } + Assert.assertTrue(open.get()); + Assert.assertEquals(0, timesClose.get()); + + actor.act(99); + Assert.assertEquals(1, timesClose.get()); + Assert.assertEquals(0, timesOpen.get()); + + Assert.assertFalse(open.get()); + + actor.act(1000); + + actor.flush(); // a flush here shuld not change anything, as it was already called once on the previous overflow + Assert.assertEquals(1, timesClose.get()); + Assert.assertEquals(0, timesOpen.get()); + Assert.assertFalse(open.get()); + + semaphore.release(); + Wait.assertTrue(open::get); + + Assert.assertEquals(1, timesClose.get()); + Assert.assertEquals(1, timesOpen.get()); + Wait.assertEquals(1000, lastProcessed::get, 5000, 1); + + actor.flush(); + + open.set(false); + + // measuring after forced flush + Wait.assertEquals(2, timesOpen::get, 5000, 1); + Wait.assertTrue(open::get); + } finally { + executorService.shutdown(); + } + } + + public void limitedProcess(Integer i) { + try { + semaphore.acquire(); + result.incrementAndGet(); + lastProcessed.set(i); + semaphore.release(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + static class Element { + Element(int i, int size) { + this.i = i; + this.size = size; + } + int i; + int size; + } + + private static int getSize(Element e) { + return e.size; + } + + protected void process(Element e) { + lastProcessed.set(e.i); + } + + public void block() { + try { + if (!semaphore.tryAcquire()) { + errors.incrementAndGet(); + System.err.println("acquire failed"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testFlow() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + try { + ThresholdActor actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release); + + final int LAST_ELEMENT = 1000; + + for (int i = 0; i <= LAST_ELEMENT; i++) { + actor.act(new Element(i, i % 2 == 0 ? 20 : 1)); + } + + Wait.assertEquals(LAST_ELEMENT, lastProcessed::get); + Assert.assertEquals(0, errors.get()); + } finally { + executorService.shutdown(); + } + } + + +} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index c0c2a2ffa7..6820566749 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -83,7 +84,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.UUIDGenerator; -import org.apache.activemq.artemis.utils.actors.Actor; +import org.apache.activemq.artemis.utils.actors.ThresholdActor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -138,6 +139,9 @@ import org.jboss.logging.Logger; */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth, TempQueueObserver { + // to be used on the packet size estimate processing for the ThresholdActor + private static final int MINIMAL_SIZE_ESTIAMTE = 1024; + private static final Logger logger = Logger.getLogger(OpenWireConnection.class); private static final KeepAliveInfo PING = new KeepAliveInfo(); @@ -153,6 +157,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private AMQConnectionContext context; + private final int actorThresholdBytes; + private final AtomicBoolean stopping = new AtomicBoolean(false); private final Map sessionIdMap = new ConcurrentHashMap<>(); @@ -188,10 +194,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private static final AtomicLongFieldUpdater LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent"); private volatile long lastSent = -1; + + private volatile boolean autoRead = true; + private ConnectionEntry connectionEntry; private boolean useKeepAlive; private long maxInactivityDuration; - private volatile Actor openWireActor; + private volatile ThresholdActor openWireActor; private final Set knownDestinations = new ConcurrentHashSet<>(); @@ -204,6 +213,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf, Executor executor) { + this(connection, server, openWireProtocolManager, wf, executor, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE); + } + + public OpenWireConnection(Connection connection, + ActiveMQServer server, + OpenWireProtocolManager openWireProtocolManager, + OpenWireFormat wf, + Executor executor, + int actorThresholdBytes) { super(connection, executor); this.server = server; this.operationContext = server.newOperationContext(); @@ -213,6 +231,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); this.transportConnection.setProtocolConnection(this); + this.actorThresholdBytes = actorThresholdBytes; } // SecurityAuth implementation @@ -285,9 +304,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se traceBufferReceived(connectionID, command); } - final Actor localVisibleActor = openWireActor; + final ThresholdActor localVisibleActor = openWireActor; if (localVisibleActor != null) { - openWireActor.act(command); + localVisibleActor.act(command); } else { act(command); } @@ -298,6 +317,30 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } + public void restoreAutoRead() { + if (!autoRead) { + autoRead = true; + openWireActor.flush(); + } + } + + public void blockConnection() { + autoRead = false; + disableAutoRead(); + } + + private void disableAutoRead() { + getTransportConnection().setAutoRead(false); + disableTtl(); + } + + protected void flushedActor() { + getTransportConnection().setAutoRead(autoRead); + if (autoRead) { + enableTtl(); + } + } + private void act(Command command) { try { @@ -765,11 +808,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se createInternalSession(info); // the actor can only be used after the WireFormat has been initialized with versioning - this.openWireActor = new Actor<>(executor, this::act); + this.openWireActor = new ThresholdActor<>(executor, this::act, actorThresholdBytes, OpenWireConnection::getSize, this::disableAutoRead, this::flushedActor); return context; } + private static int getSize(Command command) { + if (command instanceof ActiveMQMessage) { + return ((ActiveMQMessage) command).getSize(); + } else { + return MINIMAL_SIZE_ESTIAMTE; + } + } + private void createInternalSession(ConnectionInfo info) throws Exception { internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8433c9c838..2b73423c54 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -41,7 +41,9 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -97,6 +99,8 @@ public class OpenWireProtocolManager extends AbstractProtocolManager 0) { + // replace any previous value + actorThreshold = this.actorThresholdBytes; + } + + return actorThreshold; + } + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); - OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor()); + OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor(), getActorThreadshold(acceptorUsed)); owConn.sendHandshake(); //first we setup ttl to -1 diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index af6e04ada5..d02dffb321 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -89,8 +89,6 @@ public class AMQSession implements SessionCallback { private final OpenWireProtocolManager protocolManager; - private final Runnable enableAutoReadAndTtl; - private final CoreMessageObjectPools coreMessageObjectPools; private String[] existingQueuesCache; @@ -110,7 +108,6 @@ public class AMQSession implements SessionCallback { this.protocolManager = protocolManager; this.scheduledPool = protocolManager.getScheduledPool(); this.protocolManagerWireFormat = protocolManager.wireFormat().copy(); - this.enableAutoReadAndTtl = this::enableAutoReadAndTtl; this.existingQueuesCache = null; this.coreMessageObjectPools = coreMessageObjectPools; } @@ -424,20 +421,16 @@ public class AMQSession implements SessionCallback { } final PagingStore store = server.getPagingManager().getPageStore(address); - this.connection.disableTtl(); if (shouldBlockProducer) { sendShouldBlockProducer(producerInfo, messageSend, sendProducerAck, store, dest, count, coreMsg, address); } else { - //non-persistent messages goes here, by default we stop reading from - //transport - connection.getTransportConnection().setAutoRead(false); if (store != null) { - if (!store.checkMemory(enableAutoReadAndTtl)) { - enableAutoReadAndTtl(); + if (!store.checkMemory(true, this::restoreAutoRead, this::blockConnection)) { + restoreAutoRead(); throw new ResourceAllocationException("Queue is full " + address); } } else { - enableAutoReadAndTtl.run(); + restoreAutoRead(); } getCoreSession().send(coreMsg, false, dest.isTemporary()); @@ -515,7 +508,7 @@ public class AMQSession implements SessionCallback { } }; if (store != null) { - if (!store.checkMemory(false, task)) { + if (!store.checkMemory(false, task, null)) { this.connection.getContext().setDontSendReponse(false); connection.enableTtl(); throw new ResourceAllocationException("Queue is full " + address); @@ -525,9 +518,12 @@ public class AMQSession implements SessionCallback { } } - private void enableAutoReadAndTtl() { - connection.getTransportConnection().setAutoRead(true); - connection.enableTtl(); + private void restoreAutoRead() { + connection.restoreAutoRead(); + } + + private void blockConnection() { + connection.blockConnection(); } public String convertWildcard(ActiveMQDestination openWireDest) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index ddc12e6733..05d030a06d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -132,7 +132,7 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener boolean checkMemory(Runnable runnable); - boolean checkMemory(boolean runOnFailure, Runnable runnable); + boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking); boolean isFull(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 89eadcfda8..1b9b81f9de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -691,11 +691,11 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkMemory(final Runnable runWhenAvailable) { - return checkMemory(true, runWhenAvailable); + return checkMemory(true, runWhenAvailable, null); } @Override - public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) { + public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable, Runnable runWhenBlocking) { if (blockedViaAddressControl) { if (runWhenAvailable != null) { @@ -713,6 +713,9 @@ public class PagingStoreImpl implements PagingStore { } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) { + if (runWhenBlocking != null) { + runWhenBlocking.run(); + } onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 9e19fc5e84..c3d62af3e0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -396,6 +396,10 @@ public class NettyAcceptor extends AbstractAcceptor { } } + public int getTcpReceiveBufferSize() { + return tcpReceiveBufferSize; + } + @Override public synchronized void start() throws Exception { if (channelClazz != null) { diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 4b154bfb7e..ad39c3a3ed 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -410,7 +410,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } @Override - public boolean checkMemory(boolean runOnFailure, Runnable runnable) { + public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable) { return false; } diff --git a/tests/smoke-tests/src/main/resources/servers/paging/broker.xml b/tests/smoke-tests/src/main/resources/servers/paging/broker.xml index 46ccdec9c5..bbe3b880ef 100644 --- a/tests/smoke-tests/src/main/resources/servers/paging/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/paging/broker.xml @@ -103,7 +103,7 @@ under the License. - tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300;actorThresholdBytes=10000 tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java new file mode 100644 index 0000000000..62323de158 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + private static final Logger logger = Logger.getLogger(FloodServerWithAsyncSendTest.class); + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(4); + try { + for (int i = 0; i < 2; i++) { + final String queueName = "queue" + i; + executorService.execute(() -> produce(protocol, queueName)); + executorService.execute(() -> infiniteConsume(protocol, queueName)); + } + + Thread.sleep(10_000); + + running = false; + + executorService.shutdown(); + Assert.assertTrue(executorService.awaitTermination(1, TimeUnit.MINUTES)); + + for (int i = 0; i < 2; i++) { + Assert.assertEquals("should have received at least a few messages", 20, consume(protocol, "queue" + i, 20)); + } + + ConnectionFactory factory = newCF("openwire"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue3"); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + + String random = RandomUtil.randomString(); + + producer.send(session.createTextMessage(random)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(random, message.getText()); + connection.close(); + + Assert.assertEquals(0, errors.get()); + } finally { + running = false; + executorService.shutdownNow(); // just to avoid thread leakage in case anything failed on the test + } + + } + + + protected int infiniteConsume(String protocol, String queueName) { + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + int rec = 0; + try { + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + while (running) { + if (consumer.receive(5000) != null) { + rec++; + } else { + break; + } + if (rec % 10 == 0) { + logger.info(queueName + " receive " + rec); + } + } + + return rec; + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + return -1; + } finally { + try { + connection.close(); + } catch (Exception ignored) { + } + } + } + + + + protected int consume(String protocol, String queueName, int maxCount) throws Exception { + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + int rec = 0; + try { + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + while (rec < maxCount) { + if (consumer.receive(5000) != null) { + rec++; + } else { + break; + } + if (rec % 10 == 0) { + logger.info(queueName + " receive " + rec); + } + } + + return rec; + } finally { + try { + connection.close(); + } catch (Exception ignored) { + } + } + } + + protected void produce(String protocol, String queueName) { + + int produced = 0; + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + try { + + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + String randomString; + { + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < 10000) { + buffer.append(RandomUtil.randomString()); + } + randomString = buffer.toString(); + } + + while (running) { + if (++produced % 10 == 0) { + logger.info(queueName + " produced " + produced + " messages"); + } + producer.send(session.createTextMessage(randomString)); + } + + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + try { + connection.close(); + } catch (Exception ignored) { + } + } + } + +}