https://issues.apache.org/jira/browse/AMQ-6184 - add workQueueCapacity config property default to 0 where a value > 0 swaps out the dsynchQ for a capicity limited blocking queue. This allows the core pool to grow on demand as before but also allows work to be queued when necessary

This commit is contained in:
gtully 2016-09-28 09:51:05 +01:00
parent 45f60e4133
commit 08695ab303
2 changed files with 291 additions and 4 deletions

View File

@ -19,9 +19,11 @@ package org.apache.activemq.transport.nio;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.spi.AbstractSelectableChannel; import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -40,10 +42,10 @@ public final class SelectorManager {
private Executor selectorExecutor = createDefaultExecutor(); private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor; private Executor channelExecutor = selectorExecutor;
private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 1024; private int maxChannelsPerWorker = -1;
protected ExecutorService createDefaultExecutor() { protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, newWorkQueue(),
new ThreadFactory() { new ThreadFactory() {
private long i = 0; private long i = 0;
@ -59,8 +61,17 @@ public final class SelectorManager {
return rc; return rc;
} }
private BlockingQueue<Runnable> newWorkQueue() {
final int workQueueCapicity = getDefaultWorkQueueCapacity();
return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity) : new SynchronousQueue<Runnable>();
}
private static int getDefaultWorkQueueCapacity() {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0);
}
private static int getDefaultCorePoolSize() { private static int getDefaultCorePoolSize() {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10); return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10);
} }
private static int getDefaultMaximumPoolSize() { private static int getDefaultMaximumPoolSize() {
@ -71,6 +82,10 @@ public final class SelectorManager {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30); return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30);
} }
private static int getDefaultMaxChannelsPerWorker() {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", 1024);
}
public static SelectorManager getInstance() { public static SelectorManager getInstance() {
return SINGLETON; return SINGLETON;
} }
@ -124,7 +139,7 @@ public final class SelectorManager {
} }
public int getMaxChannelsPerWorker() { public int getMaxChannelsPerWorker() {
return maxChannelsPerWorker; return maxChannelsPerWorker >= 0 ? maxChannelsPerWorker : getDefaultMaxChannelsPerWorker();
} }
public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {

View File

@ -0,0 +1,272 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.nio;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*
demonstrates that with nio it does not make sense to block on the broker but thread pool
shold grow past initial corepoolsize of 10
*/
public class NIOAsyncSendWithPFCTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(NIOAsyncSendWithPFCTest.class);
private static String TRANSPORT_URL = "nio://0.0.0.0:0";
private static final String DESTINATION_ONE = "testQ1";
private static final String DESTINATION_TWO = "testQ2";
private static final int MESSAGES_TO_SEND = 100;
private static int NUMBER_OF_PRODUCERS = 10;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
PolicyMap policyMap = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry pe = new PolicyEntry();
pe.setMemoryLimit(256000);
pe.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
pe.setQueue(">");
entries.add(pe);
policyMap.setPolicyEntries(entries);
broker.setDestinationPolicy(policyMap);
broker.addConnector(TRANSPORT_URL);
broker.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(DESTINATION_ONE)});
broker.start();
TRANSPORT_URL = broker.getTransportConnectorByScheme("nio").getPublishableConnectString();
return broker;
}
/**
* Test creates 10 producer who send to a single destination using Async mode.
* Producer flow control kicks in for that destination. When producer flow control is blocking sends
* Test tries to create another JMS connection to the nio.
*/
public void testAsyncSendPFCNewConnection() throws Exception {
BrokerService broker = createBroker();
broker.waitUntilStarted();
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
QueueView queueView = getQueueView(broker, DESTINATION_ONE);
try {
for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
executorService.submit(new ProducerTask());
}
//wait till producer follow control kicks in
waitForProducerFlowControl(broker, queueView);
try {
sendMessagesAsync(1, DESTINATION_TWO);
} catch (Exception ex) {
LOG.error("Ex on send new connection", ex);
fail("*** received the following exception when creating addition producer new connection:" + ex);
}
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
public void testAsyncSendPFCExistingConnection() throws Exception {
BrokerService broker = createBroker();
broker.waitUntilStarted();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL + "?wireFormat.maxInactivityDuration=5000");
ActiveMQConnection exisitngConnection = (ActiveMQConnection) connectionFactory.createConnection();
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
QueueView queueView = getQueueView(broker, DESTINATION_ONE);
try {
for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
executorService.submit(new ProducerTask());
}
//wait till producer follow control kicks in
waitForProducerFlowControl(broker, queueView);
TestSupport.dumpAllThreads("Blocked");
try {
Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (Exception ex) {
LOG.error("Ex on create session", ex);
fail("*** received the following exception when creating producer session:" + ex);
}
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception {
boolean blockingAllSends;
do {
blockingAllSends = queueView.getBlockedSends() > 10;
Thread.sleep(1000);
} while (!blockingAllSends);
}
class ProducerTask implements Runnable {
@Override
public void run() {
try {
//send X messages
sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private Long sendMessagesAsync(int messageCount, String destination) throws Exception {
long numberOfMessageSent = 0;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setUseAsyncSend(true);
connection.start();
try {
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer jmsProducer = producerSession.createProducer(producerSession.createQueue(destination));
Message sendMessage = createTextMessage(producerSession);
for (int i = 0; i < messageCount; i++) {
jmsProducer.send(sendMessage);
numberOfMessageSent++;
}
LOG.info(" Finished after producing : " + numberOfMessageSent);
return numberOfMessageSent;
} catch (Exception ex) {
LOG.info("Exception received producing ", ex);
LOG.info("finishing after exception :" + numberOfMessageSent);
return numberOfMessageSent;
} finally {
if (connection != null) {
connection.close();
}
}
}
private TextMessage createTextMessage(Session session) throws JMSException {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1000; i++) {
buffer.append("1234567890");
}
return session.createTextMessage(buffer.toString());
}
private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews();
for (ObjectName key : queueViews.keySet()) {
DestinationView destinationView = queueViews.get(key);
if (destinationView instanceof QueueView) {
QueueView queueView = (QueueView) destinationView;
if (queueView.getName().equals(queueName)) {
return queueView;
}
}
}
return null;
}
}