ARTEMIS-4522 Dedicated thread pool for flow-control-executor
This commit is contained in:
parent
d4d4b06fc7
commit
ff2b76c252
|
@ -51,6 +51,8 @@ public final class ActiveMQClient {
|
|||
|
||||
private static int globalScheduledThreadPoolSize;
|
||||
|
||||
private static int flowControlThreadPoolSize;
|
||||
|
||||
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
|
||||
|
||||
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod();
|
||||
|
@ -130,6 +132,8 @@ public final class ActiveMQClient {
|
|||
|
||||
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
|
||||
|
||||
public static final int DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE = 10;
|
||||
|
||||
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
|
||||
|
||||
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
|
||||
|
@ -148,6 +152,8 @@ public final class ActiveMQClient {
|
|||
|
||||
private static ExecutorService globalThreadPool;
|
||||
|
||||
private static ExecutorService flowControlThreadPool;
|
||||
|
||||
private static boolean injectedPools = false;
|
||||
|
||||
private static ScheduledExecutorService globalScheduledThreadPool;
|
||||
|
@ -165,6 +171,7 @@ public final class ActiveMQClient {
|
|||
if (injectedPools) {
|
||||
globalThreadPool = null;
|
||||
globalScheduledThreadPool = null;
|
||||
flowControlThreadPool = null;
|
||||
injectedPools = false;
|
||||
return;
|
||||
}
|
||||
|
@ -196,6 +203,20 @@ public final class ActiveMQClient {
|
|||
globalScheduledThreadPool = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (flowControlThreadPool != null) {
|
||||
flowControlThreadPool.shutdownNow();
|
||||
try {
|
||||
if (!flowControlThreadPool.awaitTermination(time, unit)) {
|
||||
flowControlThreadPool.shutdownNow();
|
||||
ActiveMQClientLogger.LOGGER.unableToProcessScheduledlIn10Sec();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ActiveMQInterruptedException(e);
|
||||
} finally {
|
||||
flowControlThreadPool = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -232,6 +253,25 @@ public final class ActiveMQClient {
|
|||
return globalThreadPool;
|
||||
}
|
||||
|
||||
|
||||
public static synchronized ExecutorService getFlowControlThreadPool() {
|
||||
if (flowControlThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-flow-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
if (flowControlThreadPoolSize == -1) {
|
||||
flowControlThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
|
||||
} else {
|
||||
flowControlThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.flowControlThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
|
||||
}
|
||||
}
|
||||
return flowControlThreadPool;
|
||||
}
|
||||
|
||||
public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
|
||||
if (globalScheduledThreadPool == null) {
|
||||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
|
@ -288,6 +328,7 @@ public final class ActiveMQClient {
|
|||
|
||||
ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
|
||||
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
|
||||
ActiveMQClient.flowControlThreadPoolSize = DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -119,6 +119,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
private final Executor closeExecutor;
|
||||
|
||||
private final Executor flowControlExecutor;
|
||||
|
||||
private RemotingConnection connection;
|
||||
|
||||
private final long retryInterval;
|
||||
|
@ -173,24 +175,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
final int reconnectAttempts,
|
||||
final Executor threadPool,
|
||||
final ScheduledExecutorService scheduledThreadPool,
|
||||
final Executor flowControlPool,
|
||||
final List<Interceptor> incomingInterceptors,
|
||||
final List<Interceptor> outgoingInterceptors) {
|
||||
this(serverLocator, new Pair<>(connectorConfig, null),
|
||||
locatorConfig, reconnectAttempts, threadPool,
|
||||
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||
scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
|
||||
}
|
||||
|
||||
|
||||
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
|
||||
final ServerLocatorConfig locatorConfig,
|
||||
final int reconnectAttempts,
|
||||
final Executor threadPool,
|
||||
final ScheduledExecutorService scheduledThreadPool,
|
||||
final List<Interceptor> incomingInterceptors,
|
||||
final List<Interceptor> outgoingInterceptors) {
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
|
||||
final ServerLocatorConfig locatorConfig,
|
||||
final int reconnectAttempts,
|
||||
final Executor threadPool,
|
||||
final ScheduledExecutorService scheduledThreadPool,
|
||||
final Executor flowControlPool,
|
||||
final List<Interceptor> incomingInterceptors,
|
||||
final List<Interceptor> outgoingInterceptors) {
|
||||
this(serverLocator, connectorConfig,
|
||||
locatorConfig, reconnectAttempts, threadPool,
|
||||
scheduledThreadPool, incomingInterceptors, outgoingInterceptors, null);
|
||||
locatorConfig, reconnectAttempts, threadPool,
|
||||
scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors, null);
|
||||
}
|
||||
|
||||
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
|
||||
|
@ -199,6 +204,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
final int reconnectAttempts,
|
||||
final Executor threadPool,
|
||||
final ScheduledExecutorService scheduledThreadPool,
|
||||
final Executor flowControlPoll,
|
||||
final List<Interceptor> incomingInterceptors,
|
||||
final List<Interceptor> outgoingInterceptors,
|
||||
final TransportConfiguration[] connectorConfigs) {
|
||||
|
@ -250,6 +256,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
|
||||
|
||||
flowControlExecutor = new OrderedExecutorFactory(flowControlPoll).getExecutor();
|
||||
|
||||
closeExecutor = orderedExecutorFactory.getExecutor();
|
||||
|
||||
this.incomingInterceptors = incomingInterceptors;
|
||||
|
@ -836,7 +844,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
|
||||
|
||||
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
|
||||
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());
|
||||
|
||||
synchronized (sessions) {
|
||||
if (closed || !clientProtocolManager.isAlive()) {
|
||||
|
|
|
@ -137,6 +137,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
private transient Executor threadPool;
|
||||
|
||||
private transient Executor flowControlPool;
|
||||
private transient ScheduledExecutorService scheduledThreadPool;
|
||||
|
||||
private transient DiscoveryGroup discoveryGroup;
|
||||
|
@ -188,12 +189,15 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
ActiveMQClient.clearThreadPools();
|
||||
}
|
||||
|
||||
|
||||
private synchronized void setThreadPools() {
|
||||
if (threadPool != null) {
|
||||
return;
|
||||
} else if (config.useGlobalPools) {
|
||||
threadPool = ActiveMQClient.getGlobalThreadPool();
|
||||
|
||||
flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config
|
||||
|
||||
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
|
||||
} else {
|
||||
this.shutdownPool = true;
|
||||
|
@ -201,7 +205,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ServerLocatorImpl.class.getClassLoader());
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true,
|
||||
ServerLocatorImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -214,10 +219,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
|
||||
@Override
|
||||
public ThreadFactory run() {
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
|
||||
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true,
|
||||
ClientSessionFactoryImpl.class.getClassLoader());
|
||||
}
|
||||
});
|
||||
|
||||
flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config;
|
||||
scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
|
||||
}
|
||||
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
|
||||
|
@ -625,7 +632,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
initialize();
|
||||
|
||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors);
|
||||
|
||||
addToConnecting(factory);
|
||||
try {
|
||||
|
@ -701,7 +708,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
// try each factory in the list until we find one which works
|
||||
|
||||
try {
|
||||
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
|
||||
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
|
||||
try {
|
||||
addToConnecting(factory);
|
||||
// We always try to connect here with only one attempt,
|
||||
|
@ -1804,7 +1811,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
connectors = new ArrayList<>();
|
||||
if (initialConnectors != null) {
|
||||
for (TransportConfiguration initialConnector : initialConnectors) {
|
||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
|
||||
|
||||
connectors.add(new Connector(initialConnector, factory));
|
||||
}
|
||||
|
|
|
@ -157,6 +157,7 @@ public class BackupActivationNoReconnectTest {
|
|||
final ServerLocatorConfig locatorConfig = Mockito.mock(ServerLocatorConfig.class);
|
||||
final int reconnectAttempts = 1;
|
||||
final Executor threadPool = Mockito.mock(Executor.class);
|
||||
final Executor flowControlPool = Mockito.mock(Executor.class);
|
||||
final ScheduledExecutorService scheduledThreadPool = Mockito.mock(ScheduledExecutorService.class);
|
||||
final ClientProtocolManager clientProtocolManager = Mockito.mock(ClientProtocolManager.class);
|
||||
when(serverLocator.newProtocolManager()).thenReturn(clientProtocolManager);
|
||||
|
@ -164,7 +165,7 @@ public class BackupActivationNoReconnectTest {
|
|||
Map<String, Object> urlParams = new HashMap<>();
|
||||
urlParams.put("port", serverSocket.getLocalPort());
|
||||
when(connectorConfig.getCombinedParams()).thenReturn(urlParams);
|
||||
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, null, null);
|
||||
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,null, null);
|
||||
when(clusterControl.getSessionFactory()).thenReturn(sessionFactory);
|
||||
when(clientProtocolManager.isAlive()).thenReturn(true);
|
||||
|
||||
|
@ -270,4 +271,4 @@ public class BackupActivationNoReconnectTest {
|
|||
assertFalse(reconnectWorkOnRetry.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test for slow consumer handling getting influenced by thread pool exhaustion.
|
||||
* See ARTEMIS-4522.
|
||||
* <p/>
|
||||
* Test is easily reproducing the issue described in ARTEMIS-4522 with settings of 2000 {@link #NUM_OF_MESSAGES}, 100 {@link #CONSUMER_COUNT} and 50 {@link #THREAD_POOL_SIZE_CLIENT} on an 8 core machine.
|
||||
* Fewer messages and fewer consumers make the probability lower for the issue to appear.
|
||||
* More client-threads make the issue less likely to appear.
|
||||
* Theory: {@link #THREAD_POOL_SIZE_CLIENT} being larger than {@link #CONSUMER_COUNT} will make the issue impossible to appear.
|
||||
* <p/>
|
||||
* With 2000 messages, the test usually runs into the issue of ClientConsumerImpl#startSlowConsumer reaching its 10-second timeout (or taking significant long between 1 and 10 seconds)
|
||||
* on pendingFlowControl#await after about 1200-1500 messages.
|
||||
* Visible in log as 10 second pause before next bulk of messages get processed.
|
||||
*/
|
||||
public class ConsumerSlowConsumerTest extends ActiveMQTestBase {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");
|
||||
|
||||
private ServerLocator locator;
|
||||
|
||||
private static final int WINDOW_SIZE = 0;
|
||||
|
||||
private static final int NUM_OF_MESSAGES = 2_000;
|
||||
private static final int CONSUMER_COUNT = 500;
|
||||
private static final int THREAD_POOL_SIZE_CLIENT = 5;
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
private final Set<ClientConsumerImpl> consumers = ConcurrentHashMap.newKeySet();
|
||||
private final Set<ClientSession> sessions = ConcurrentHashMap.newKeySet();
|
||||
|
||||
|
||||
protected boolean isNetty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
locator = createFactory(isNetty());
|
||||
locator.setConsumerWindowSize(WINDOW_SIZE);
|
||||
locator.setUseGlobalPools(false);
|
||||
locator.setThreadPoolMaxSize(THREAD_POOL_SIZE_CLIENT);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSlowConsumer() throws Exception {
|
||||
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||
|
||||
messagingService.start();
|
||||
messagingService.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
|
||||
AtomicInteger sentMessages = new AtomicInteger(0);
|
||||
sendMessages(cf, sentMessages);
|
||||
|
||||
AtomicInteger receivedMessages = new AtomicInteger(0);
|
||||
createConsumers(cf, receivedMessages);
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
// allow for duration of 5ms per message (neglecting concurrency, which makes it even a lot faster)
|
||||
// typical runtime for 1000 messages 100 consumers and 50 threads without issues is 160ms, so deadline is very generous
|
||||
final long deadLine = System.currentTimeMillis() + sentMessages.get() * 5L;
|
||||
int counter = 0;
|
||||
while (receivedMessages.get() < sentMessages.get() && System.currentTimeMillis() < deadLine) {
|
||||
Thread.sleep(5);
|
||||
counter++;
|
||||
if (counter % 1000 == 0) {
|
||||
logger.info("Waiting for " + (sentMessages.get() - receivedMessages.get()) + " more messages...");
|
||||
}
|
||||
}
|
||||
final long endTime = System.currentTimeMillis();
|
||||
stopped = true; // signal stop to potentially still running consumer-creation thread
|
||||
|
||||
// check amount of sent and received messages
|
||||
if (receivedMessages.get() < sentMessages.get()) {
|
||||
logger.error("Received only " + receivedMessages.get() + " messages out of " + sentMessages.get());
|
||||
} else {
|
||||
logger.info("Received all " + receivedMessages.get() + " messages");
|
||||
}
|
||||
assertEquals(sentMessages.get(), receivedMessages.get());
|
||||
|
||||
final long duration = endTime - startTime;
|
||||
logger.info("Test took " + duration + " ms");
|
||||
|
||||
long expectedDuration = NUM_OF_MESSAGES * 10;
|
||||
|
||||
assertTrue("Test took " + duration + " ms, expected " + expectedDuration + " ms", duration < expectedDuration);
|
||||
|
||||
cleanup(cf, messagingService);
|
||||
}
|
||||
|
||||
private void cleanup(ClientSessionFactory cf, ActiveMQServer messagingService) throws Exception {
|
||||
consumers.parallelStream().forEach(c -> {
|
||||
try {
|
||||
c.close();
|
||||
} catch (ActiveMQException e) {
|
||||
//ignore
|
||||
}
|
||||
});
|
||||
logger.info("Closed " + consumers.size() + " consumers");
|
||||
sessions.parallelStream().forEach(s -> {
|
||||
try {
|
||||
s.close();
|
||||
} catch (ActiveMQException e) {
|
||||
//ignore
|
||||
}
|
||||
});
|
||||
logger.info("Closed " + sessions.size() + " sessions");
|
||||
|
||||
cf.close();
|
||||
messagingService.stop();
|
||||
|
||||
logger.info("Cleaned up.");
|
||||
}
|
||||
|
||||
private void sendMessages(ClientSessionFactory cf, AtomicInteger sentMessages) throws ActiveMQException {
|
||||
logger.info("Creating " + NUM_OF_MESSAGES + " messages...");
|
||||
|
||||
try (ClientSession sendingSession = cf.createSession(false, true, true);
|
||||
ClientProducer producer = sendingSession.createProducer(QUEUE)) {
|
||||
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
|
||||
ClientMessage message = createTextMessage(sendingSession, "m" + i);
|
||||
producer.send(message);
|
||||
sentMessages.incrementAndGet();
|
||||
}
|
||||
logger.info("Created " + NUM_OF_MESSAGES + " messages");
|
||||
}
|
||||
}
|
||||
|
||||
private void createConsumers(ClientSessionFactory cf, AtomicInteger receivedMessages) throws ActiveMQException {
|
||||
Thread consumerCreator = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("Creating " + CONSUMER_COUNT + " consumers...");
|
||||
try {
|
||||
for (int i = 0; i < CONSUMER_COUNT; i++) {
|
||||
if (stopped) {
|
||||
logger.info("Stopping consumer creation, since test has ended already. Created " + i + " out of " + CONSUMER_COUNT + " consumers so far.");
|
||||
return;
|
||||
}
|
||||
|
||||
ClientSession session = cf.createSession(false, true, true);
|
||||
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(QUEUE);
|
||||
sessions.add(session);
|
||||
consumers.add(consumer);
|
||||
|
||||
assertEquals(WINDOW_SIZE == 0 ? 0 : WINDOW_SIZE / 2, consumer.getClientWindowSize());
|
||||
String consumerId = "[" + i + "]";
|
||||
consumer.setMessageHandler(message -> {
|
||||
Thread.yield(); // simulate processing and yield to other threads
|
||||
receivedMessages.incrementAndGet();
|
||||
//instanceLog.info(consumerId + "\t- Received message: " + message.getMessageID());
|
||||
});
|
||||
|
||||
session.start();
|
||||
}
|
||||
|
||||
logger.info("Created all " + CONSUMER_COUNT + " consumers.");
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error creating consumers!", ex);
|
||||
//fail("Error creating consumers!");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
consumerCreator.start();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue