From 634b42016a4c347217129d49a4175afaba9666ed Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 28 Sep 2016 12:15:50 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5486 - allow selector manager to reject tasks - org.apache.activemq.transport.nio.SelectorManager.rejectWork leaving the default to caller runs policy. This allows a broker to implement qos for existing connections by forcing others away --- .../transport/nio/SelectorManager.java | 11 +- .../org/apache/activemq/bugs/AMQ5486Test.java | 135 ++++++++++++++++++ .../nio/NIOAsyncSendWithPFCTest.java | 4 - 3 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index b6b1f50b8f..28d2559c20 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -56,16 +57,24 @@ public final class SelectorManager { t.setDaemon(true); return t; } - }, new ThreadPoolExecutor.CallerRunsPolicy()); + }, newRejectionHandler()); return rc; } + private RejectedExecutionHandler newRejectionHandler() { + return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy(); + } + private BlockingQueue newWorkQueue() { final int workQueueCapicity = getDefaultWorkQueueCapacity(); return workQueueCapicity > 0 ? new LinkedBlockingQueue(workQueueCapicity) : new SynchronousQueue(); } + private static boolean canRejectWork() { + return Boolean.getBoolean("org.apache.activemq.transport.nio.SelectorManager.rejectWork"); + } + private static int getDefaultWorkQueueCapacity() { return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java new file mode 100644 index 0000000000..675d660cf1 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5486Test.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jms.support.JmsUtils; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AMQ5486Test { + + private static final int maxConnections = 100; + private static final int maxPoolSize = 10; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + final ConcurrentLinkedQueue connections = new ConcurrentLinkedQueue(); + + @Before + public void setUp() throws Exception { + + // max out the pool and reject work + System.setProperty("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", String.valueOf(maxPoolSize)); + System.setProperty("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", "0"); + System.setProperty("org.apache.activemq.transport.nio.SelectorManager.rejectWork", "true"); + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + connector = service.addConnector("nio://0.0.0.0:0"); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testFailureOnSelectorThreadPoolExhaustion() throws Exception { + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + final LinkedList exceptions = new LinkedList(); + for(int i = 0; i < maxConnections; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + ActiveMQConnection conn = null; + try { + startupLatch.await(); + conn = (ActiveMQConnection) cf.createConnection(); + conn.start(); + //conn.syncSendPacket(new TransactionInfo(conn.getConnectionInfo().getConnectionId(), null, TransactionInfo.END)); + connections.add(conn); + } catch (Exception e) { + exceptions.add(e); + JmsUtils.closeConnection(conn); + } + } + }); + } + + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + + final TransportConnector connector = this.connector; + + + // Expect the max connections is created + assertTrue("Expected some exceptions", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !exceptions.isEmpty(); + } + }) + ); + + assertTrue("Expected: more than " + (maxPoolSize - 1) + " connections, found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + // selector thread will take one thread from the pool + return connector.getConnections().size() >= maxPoolSize - 1; + } + }) + ); + } + + @After + public void tearDown() throws Exception { + executor.shutdownNow(); + + for (Connection connection : connections) { + JmsUtils.closeConnection(connection); + } + + service.stop(); + service.waitUntilStopped(); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java index 0b7b7c33b6..a9cc901e67 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java @@ -146,13 +146,9 @@ public class NIOAsyncSendWithPFCTest extends TestCase { } - //wait till producer follow control kicks in waitForProducerFlowControl(broker, queueView); - - TestSupport.dumpAllThreads("Blocked"); - try { Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (Exception ex) {