ARTEMIS-3604 Async sends overflowing server in OpenWire

This commit is contained in:
Clebert Suconic 2021-12-10 12:41:50 -05:00 committed by clebertsuconic
parent 1e89ce828c
commit 1e62979577
11 changed files with 578 additions and 25 deletions

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.actors;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.ToIntFunction;
import org.jboss.logging.Logger;
public class ThresholdActor<T> extends ProcessorBase<Object> {
private static final Logger logger = Logger.getLogger(ThresholdActor.class);
private static final AtomicIntegerFieldUpdater<ThresholdActor> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "size");
private volatile int size = 0;
private static final AtomicIntegerFieldUpdater<ThresholdActor> SCHEDULED_FLUSH_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "scheduledFlush");
private volatile int scheduledFlush = 0;
private static final Object FLUSH = new Object();
private final int maxSize;
private final ToIntFunction<T> sizeGetter;
private final ActorListener<T> listener;
private final Runnable overThreshold;
private final Runnable clearThreshold;
public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable clearThreshold) {
super(parent);
this.listener = listener;
this.maxSize = maxSize;
this.sizeGetter = sizeGetter;
this.overThreshold = overThreshold;
this.clearThreshold = clearThreshold;
}
@Override
protected final void doTask(Object task) {
if (task == FLUSH) {
clearThreshold.run();
// should set to 0 no matter the value. There's a single thread setting this value back to zero
SCHEDULED_FLUSH_UPDATER.set(this, 0);
return;
}
final T theTask = (T)task;
int estimateSize = sizeGetter.applyAsInt(theTask);
try {
listener.onMessage(theTask);
} finally {
if (estimateSize > 0) {
SIZE_UPDATER.getAndAdd(this, -size);
} else {
logger.debug("element " + theTask + " returned an invalid size over the Actor during release");
}
}
}
public void act(T message) {
int sizeEstimate = sizeGetter.applyAsInt(message);
if (sizeEstimate > 0) {
int size = SIZE_UPDATER.addAndGet(this, sizeGetter.applyAsInt(message));
if (size > maxSize) {
flush();
}
} else {
logger.debug("element " + message + " returned an invalid size over the Actor");
}
task(message);
}
public void flush() {
if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) {
overThreshold.run();
task(FLUSH);
}
}
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.actors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class ThresholdActorTest {
Semaphore semaphore = new Semaphore(1);
AtomicInteger result = new AtomicInteger(0);
AtomicInteger lastProcessed = new AtomicInteger(0);
AtomicInteger errors = new AtomicInteger(0);
@Test
public void limitedSize() throws Exception {
lastProcessed.set(0);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
AtomicInteger timesOpen = new AtomicInteger(0);
AtomicInteger timesClose = new AtomicInteger(0);
AtomicBoolean open = new AtomicBoolean(true);
try {
semaphore.acquire();
ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, this::limitedProcess, 10, (s) -> 1, () -> {
timesClose.incrementAndGet();
open.set(false);
}, () -> {
timesOpen.incrementAndGet();
open.set(true);
});
for (int i = 0; i < 10; i++) {
actor.act(i);
}
Assert.assertTrue(open.get());
Assert.assertEquals(0, timesClose.get());
actor.act(99);
Assert.assertEquals(1, timesClose.get());
Assert.assertEquals(0, timesOpen.get());
Assert.assertFalse(open.get());
actor.act(1000);
actor.flush(); // a flush here shuld not change anything, as it was already called once on the previous overflow
Assert.assertEquals(1, timesClose.get());
Assert.assertEquals(0, timesOpen.get());
Assert.assertFalse(open.get());
semaphore.release();
Wait.assertTrue(open::get);
Assert.assertEquals(1, timesClose.get());
Assert.assertEquals(1, timesOpen.get());
Wait.assertEquals(1000, lastProcessed::get, 5000, 1);
actor.flush();
open.set(false);
// measuring after forced flush
Wait.assertEquals(2, timesOpen::get, 5000, 1);
Wait.assertTrue(open::get);
} finally {
executorService.shutdown();
}
}
public void limitedProcess(Integer i) {
try {
semaphore.acquire();
result.incrementAndGet();
lastProcessed.set(i);
semaphore.release();
} catch (Throwable e) {
e.printStackTrace();
}
}
static class Element {
Element(int i, int size) {
this.i = i;
this.size = size;
}
int i;
int size;
}
private static int getSize(Element e) {
return e.size;
}
protected void process(Element e) {
lastProcessed.set(e.i);
}
public void block() {
try {
if (!semaphore.tryAcquire()) {
errors.incrementAndGet();
System.err.println("acquire failed");
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testFlow() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release);
final int LAST_ELEMENT = 1000;
for (int i = 0; i <= LAST_ELEMENT; i++) {
actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
}
Wait.assertEquals(LAST_ELEMENT, lastProcessed::get);
Assert.assertEquals(0, errors.get());
} finally {
executorService.shutdown();
}
}
}

View File

@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -83,7 +84,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.ThresholdActor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -138,6 +139,9 @@ import org.jboss.logging.Logger;
*/
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth, TempQueueObserver {
// to be used on the packet size estimate processing for the ThresholdActor
private static final int MINIMAL_SIZE_ESTIAMTE = 1024;
private static final Logger logger = Logger.getLogger(OpenWireConnection.class);
private static final KeepAliveInfo PING = new KeepAliveInfo();
@ -153,6 +157,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private AMQConnectionContext context;
private final int actorThresholdBytes;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
@ -188,10 +194,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private static final AtomicLongFieldUpdater<OpenWireConnection> LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent");
private volatile long lastSent = -1;
private volatile boolean autoRead = true;
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
private long maxInactivityDuration;
private volatile Actor<Command> openWireActor;
private volatile ThresholdActor<Command> openWireActor;
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
@ -204,6 +213,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf,
Executor executor) {
this(connection, server, openWireProtocolManager, wf, executor, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE);
}
public OpenWireConnection(Connection connection,
ActiveMQServer server,
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf,
Executor executor,
int actorThresholdBytes) {
super(connection, executor);
this.server = server;
this.operationContext = server.newOperationContext();
@ -213,6 +231,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
this.transportConnection.setProtocolConnection(this);
this.actorThresholdBytes = actorThresholdBytes;
}
// SecurityAuth implementation
@ -285,9 +304,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
traceBufferReceived(connectionID, command);
}
final Actor<Command> localVisibleActor = openWireActor;
final ThresholdActor<Command> localVisibleActor = openWireActor;
if (localVisibleActor != null) {
openWireActor.act(command);
localVisibleActor.act(command);
} else {
act(command);
}
@ -298,6 +317,30 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
public void restoreAutoRead() {
if (!autoRead) {
autoRead = true;
openWireActor.flush();
}
}
public void blockConnection() {
autoRead = false;
disableAutoRead();
}
private void disableAutoRead() {
getTransportConnection().setAutoRead(false);
disableTtl();
}
protected void flushedActor() {
getTransportConnection().setAutoRead(autoRead);
if (autoRead) {
enableTtl();
}
}
private void act(Command command) {
try {
@ -765,11 +808,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
createInternalSession(info);
// the actor can only be used after the WireFormat has been initialized with versioning
this.openWireActor = new Actor<>(executor, this::act);
this.openWireActor = new ThresholdActor<>(executor, this::act, actorThresholdBytes, OpenWireConnection::getSize, this::disableAutoRead, this::flushedActor);
return context;
}
private static int getSize(Command command) {
if (command instanceof ActiveMQMessage) {
return ((ActiveMQMessage) command).getSize();
} else {
return MINIMAL_SIZE_ESTIAMTE;
}
}
private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser);
}

View File

@ -41,7 +41,9 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -97,6 +99,8 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private boolean prefixPacketSize = true;
private int actorThresholdBytes = -1;
private BrokerId brokerId;
protected final ProducerId advisoryProducerId = new ProducerId();
@ -236,6 +240,17 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
}
}
/*** if set, the OpenWire connection will bypass the tcpReadBuferSize and use this value instead.
* This is by default -1, and it should not be used unless in extreme situations like on a slow storage. */
public int getActorThresholdBytes() {
return actorThresholdBytes;
}
public OpenWireProtocolManager setActorThresholdBytes(int actorThresholdBytes) {
this.actorThresholdBytes = actorThresholdBytes;
return this;
}
public ScheduledExecutorService getScheduledPool() {
return scheduledPool;
}
@ -293,10 +308,25 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
return super.invokeInterceptors(this.outgoingInterceptors, command, connection);
}
private int getActorThreadshold(Acceptor acceptorUsed) {
int actorThreshold = TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE;
if (acceptorUsed instanceof NettyAcceptor) {
actorThreshold = ((NettyAcceptor) acceptorUsed).getTcpReceiveBufferSize();
}
if (this.actorThresholdBytes > 0) {
// replace any previous value
actorThreshold = this.actorThresholdBytes;
}
return actorThreshold;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor());
OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor(), getActorThreadshold(acceptorUsed));
owConn.sendHandshake();
//first we setup ttl to -1

View File

@ -89,8 +89,6 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager;
private final Runnable enableAutoReadAndTtl;
private final CoreMessageObjectPools coreMessageObjectPools;
private String[] existingQueuesCache;
@ -110,7 +108,6 @@ public class AMQSession implements SessionCallback {
this.protocolManager = protocolManager;
this.scheduledPool = protocolManager.getScheduledPool();
this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
this.existingQueuesCache = null;
this.coreMessageObjectPools = coreMessageObjectPools;
}
@ -424,20 +421,16 @@ public class AMQSession implements SessionCallback {
}
final PagingStore store = server.getPagingManager().getPageStore(address);
this.connection.disableTtl();
if (shouldBlockProducer) {
sendShouldBlockProducer(producerInfo, messageSend, sendProducerAck, store, dest, count, coreMsg, address);
} else {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
if (store != null) {
if (!store.checkMemory(enableAutoReadAndTtl)) {
enableAutoReadAndTtl();
if (!store.checkMemory(true, this::restoreAutoRead, this::blockConnection)) {
restoreAutoRead();
throw new ResourceAllocationException("Queue is full " + address);
}
} else {
enableAutoReadAndTtl.run();
restoreAutoRead();
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
@ -515,7 +508,7 @@ public class AMQSession implements SessionCallback {
}
};
if (store != null) {
if (!store.checkMemory(false, task)) {
if (!store.checkMemory(false, task, null)) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
@ -525,9 +518,12 @@ public class AMQSession implements SessionCallback {
}
}
private void enableAutoReadAndTtl() {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
private void restoreAutoRead() {
connection.restoreAutoRead();
}
private void blockConnection() {
connection.blockConnection();
}
public String convertWildcard(ActiveMQDestination openWireDest) {

View File

@ -132,7 +132,7 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
boolean checkMemory(Runnable runnable);
boolean checkMemory(boolean runOnFailure, Runnable runnable);
boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking);
boolean isFull();

View File

@ -691,11 +691,11 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
return checkMemory(true, runWhenAvailable);
return checkMemory(true, runWhenAvailable, null);
}
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable, Runnable runWhenBlocking) {
if (blockedViaAddressControl) {
if (runWhenAvailable != null) {
@ -713,6 +713,9 @@ public class PagingStoreImpl implements PagingStore {
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) {
if (runWhenBlocking != null) {
runWhenBlocking.run();
}
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

View File

@ -396,6 +396,10 @@ public class NettyAcceptor extends AbstractAcceptor {
}
}
public int getTcpReceiveBufferSize() {
return tcpReceiveBufferSize;
}
@Override
public synchronized void start() throws Exception {
if (channelClazz != null) {

View File

@ -410,7 +410,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public boolean checkMemory(boolean runOnFailure, Runnable runnable) {
public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable) {
return false;
}

View File

@ -103,7 +103,7 @@ under the License.
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300;actorThresholdBytes=10000</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FloodServerWithAsyncSendTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(FloodServerWithAsyncSendTest.class);
public static final String SERVER_NAME_0 = "paging";
volatile boolean running = true;
AtomicInteger errors = new AtomicInteger(0);
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
startServer(SERVER_NAME_0, 0, 30000);
}
@Test
public void testAsyncPagingOpenWire() throws Exception {
String protocol = "OPENWIRE";
internalTest(protocol);
}
ConnectionFactory newCF(String protocol) {
if (protocol.equalsIgnoreCase("OPENWIRE")) {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true");
} else {
Assert.fail("unsuported protocol");
return null;
}
}
private void internalTest(String protocol) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(4);
try {
for (int i = 0; i < 2; i++) {
final String queueName = "queue" + i;
executorService.execute(() -> produce(protocol, queueName));
executorService.execute(() -> infiniteConsume(protocol, queueName));
}
Thread.sleep(10_000);
running = false;
executorService.shutdown();
Assert.assertTrue(executorService.awaitTermination(1, TimeUnit.MINUTES));
for (int i = 0; i < 2; i++) {
Assert.assertEquals("should have received at least a few messages", 20, consume(protocol, "queue" + i, 20));
}
ConnectionFactory factory = newCF("openwire");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue3");
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
String random = RandomUtil.randomString();
producer.send(session.createTextMessage(random));
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(random, message.getText());
connection.close();
Assert.assertEquals(0, errors.get());
} finally {
running = false;
executorService.shutdownNow(); // just to avoid thread leakage in case anything failed on the test
}
}
protected int infiniteConsume(String protocol, String queueName) {
ConnectionFactory factory = newCF(protocol);
Connection connection = null;
int rec = 0;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
while (running) {
if (consumer.receive(5000) != null) {
rec++;
} else {
break;
}
if (rec % 10 == 0) {
logger.info(queueName + " receive " + rec);
}
}
return rec;
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
return -1;
} finally {
try {
connection.close();
} catch (Exception ignored) {
}
}
}
protected int consume(String protocol, String queueName, int maxCount) throws Exception {
ConnectionFactory factory = newCF(protocol);
Connection connection = null;
int rec = 0;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
while (rec < maxCount) {
if (consumer.receive(5000) != null) {
rec++;
} else {
break;
}
if (rec % 10 == 0) {
logger.info(queueName + " receive " + rec);
}
}
return rec;
} finally {
try {
connection.close();
} catch (Exception ignored) {
}
}
}
protected void produce(String protocol, String queueName) {
int produced = 0;
ConnectionFactory factory = newCF(protocol);
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
String randomString;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 10000) {
buffer.append(RandomUtil.randomString());
}
randomString = buffer.toString();
}
while (running) {
if (++produced % 10 == 0) {
logger.info(queueName + " produced " + produced + " messages");
}
producer.send(session.createTextMessage(randomString));
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
try {
connection.close();
} catch (Exception ignored) {
}
}
}
}