https://issues.apache.org/jira/browse/AMQ-3393 - Number of established STOMP connections constantly increasing. resolve contention over delayed stop. add count attribute to connector to see current number of connections

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1143080 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-07-05 14:37:39 +00:00
parent 3cce6b7cbb
commit 4cf6dae48b
7 changed files with 116 additions and 17 deletions

View File

@ -59,4 +59,5 @@ public interface Connector extends Service {
*/
public boolean isUpdateClusterClientsOnRemove();
int connectionCount();
}

View File

@ -894,9 +894,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void start() throws Exception {
starting = true;
try {
synchronized (this) {
starting = true;
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
@ -923,22 +923,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
starting = false;
if (pendingStop) {
LOG.debug("Calling the delayed stop()");
setStarting(false);
if (isPendingStop()) {
LOG.debug("Calling the delayed stop() after start() " + this);
stop();
}
}
}
public void stop() throws Exception {
synchronized (this) {
pendingStop = true;
if (starting) {
LOG.debug("stop() called in the middle of start(). Delaying...");
return;
}
}
stopAsync();
while (!stopped.await(5, TimeUnit.SECONDS)) {
LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
@ -946,8 +939,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void stopAsync() {
// If we're in the middle of starting
// then go no further... for now.
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
pendingStop = true;
if (starting) {
LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
return;
}
}
if (stopping.compareAndSet(false, true)) {
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
@ -962,8 +961,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
try {
doStop();
} catch (Throwable e) {
LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
+ "': ", e);
LOG.debug("Error occurred while shutting down a connection " + this, e);
} finally {
stopped.countDown();
serviceLock.writeLock().unlock();

View File

@ -548,4 +548,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
this.updateClusterFilter = updateClusterFilter;
}
public int connectionCount() {
return connections.size();
}
}

View File

@ -77,4 +77,11 @@ public class ConnectorView implements ConnectorViewMBean {
return connector.getStatistics().isEnabled();
}
/**
* Returns the number of current connections
*/
public int connectionCount() {
return connector.connectionCount();
}
}

View File

@ -19,6 +19,9 @@ package org.apache.activemq.broker.jmx;
import org.apache.activemq.Service;
public interface ConnectorViewMBean extends Service {
@MBeanInfo("Connection count")
int connectionCount();
/**
* Resets the statistics

View File

@ -116,7 +116,6 @@ public class ManagedTransportConnection extends TransportConnection {
}
protected ObjectName createByAddressObjectName(String type, String value) throws IOException {
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName="
@ -131,7 +130,6 @@ public class ManagedTransportConnection extends TransportConnection {
}
protected ObjectName createByClientIdObjectName(String value) throws IOException {
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName="

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.util.Vector;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static junit.framework.Assert.assertTrue;
// https://issues.apache.org/jira/browse/AMQ-3393
public class ConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
BrokerService brokerService;
Vector<Throwable> exceptions = new Vector<Throwable>();
@Before
public void startBroker() throws Exception {
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.addConnector("stomp://0.0.0.0:61612");
brokerService.start();
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testStompConnectLeak() throws Exception {
Thread t1 = new Thread() {
StompConnection connection = new StompConnection();
public void run() {
try {
connection.open("localhost", 61612);
connection.connect("system", "manager");
connection.disconnect();
} catch (Exception ex) {
LOG.error("unexpected exception on connect/disconnect", ex);
exceptions.add(ex);
}
}
};
int i = 0;
long done = System.currentTimeMillis() + (60 * 1000 * 2);
while (System.currentTimeMillis() < done) {
t1.run();
if (++i % 5000 == 0) {
LOG.info("connection count on stomp connector:" + brokerService.getTransportConnectors().get(0).connectionCount());
}
}
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
}
}));
assertTrue("no exceptions", exceptions.isEmpty());
}
}