From 3f9b6ed391784e75598fee1fa23ac897a98af57a Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 12 Apr 2013 23:02:48 +0000 Subject: [PATCH] fix and test for: https://issues.apache.org/jira/browse/AMQ-4469 Rewrote the unit test as a JUnit 4 test and remove hard coded 61616 port dep. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1467510 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/tcp/TcpTransportServer.java | 22 ++-- .../org/apache/activemq/bugs/AMQ4469Test.java | 114 ++++++++++++++++++ 2 files changed, 128 insertions(+), 8 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 490252af90..4caff76b4c 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ServerSocketFactory; @@ -113,7 +114,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements * The maximum number of sockets allowed for this server */ protected int maximumConnections = Integer.MAX_VALUE; - protected int currentTransportCount=0; + protected AtomicInteger currentTransportCount = new AtomicInteger(); public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); @@ -177,6 +178,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements * * @param brokerInfo */ + @Override public void setBrokerInfo(BrokerInfo brokerInfo) { } @@ -267,6 +269,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements /** * pull Sockets from the ServerSocket */ + @Override public void run() { while (!isStopped()) { Socket socket = null; @@ -312,6 +315,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements /** * @return pretty print of this */ + @Override public String toString() { return "" + getBindLocation(); } @@ -337,9 +341,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements return result; } + @Override protected void doStart() throws Exception { if(useQueueForAccept) { Runnable run = new Runnable() { + @Override public void run() { try { while (!isStopped() && !isStopping()) { @@ -355,9 +361,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements onAcceptError(e); } } - } - }; socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), @@ -367,9 +371,9 @@ public class TcpTransportServer extends TransportServerThreadSupport implements socketHandlerThread.start(); } super.doStart(); - } + @Override protected void doStop(ServiceStopper stopper) throws Exception { super.doStop(stopper); if (serverSocket != null) { @@ -377,13 +381,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements } } + @Override public InetSocketAddress getSocketAddress() { return (InetSocketAddress)serverSocket.getLocalSocketAddress(); } protected final void handleSocket(Socket socket) { try { - if (this.currentTransportCount >= this.maximumConnections) { + if (this.currentTransportCount.get() >= this.maximumConnections) { throw new ExceededMaximumConnectionsException("Exceeded the maximum " + "number of allowed client connections. See the 'maximumConnections' " + "property on the TCP transport configuration URI in the ActiveMQ " + @@ -416,6 +421,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements transportFactory.serverConfigure( transport, format, options); getAcceptListener().onAccept(configuredTransport); + currentTransportCount.incrementAndGet(); } } catch (SocketTimeoutException ste) { // expect this to happen @@ -427,7 +433,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements onAcceptError(e); } } - } public int getSoTimeout() { @@ -468,12 +473,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements this.maximumConnections = maximumConnections; } + @Override public void started(Service service) { - this.currentTransportCount++; } + @Override public void stopped(Service service) { - this.currentTransportCount--; + this.currentTransportCount.decrementAndGet(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java new file mode 100644 index 0000000000..2f7ae69f18 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jms.support.JmsUtils; + +public class AMQ4469Test { + + private static final int maxConnections = 100; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + + @Before + public void setUp() throws Exception { + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + connector = service.addConnector("tcp://0.0.0.0:0?maximumConnections="+maxConnections); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testMaxConnectionControl() throws Exception { + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + for(int i = 0; i < maxConnections + 20; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + Connection conn = null; + try { + startupLatch.await(); + conn = cf.createConnection(); + conn.start(); + } catch (Exception e) { + e.printStackTrace(); + JmsUtils.closeConnection(conn); + } + } + }); + } + + TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); + // ensure the max connections is in effect + assertEquals(maxConnections, transportServer.getMaximumConnections()); + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + TimeUnit.SECONDS.sleep(5); + + final TransportConnector connector = this.connector; + + // Expect the max connections is created + assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == maxConnections; + } + }) + ); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + + service.stop(); + service.waitUntilStopped(); + } +}