https://issues.apache.org/jira/browse/AMQ-3792: use of the failover transport incorrectly suppresses javax.jms.InvalidClientIDException when clientId is already in use. fixup with test and fix to test dependent on wrong behaviour

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1307142 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-03-29 22:05:18 +00:00
parent c6ec76e6f6
commit 33cdefaf2e
5 changed files with 70 additions and 44 deletions

View File

@ -62,18 +62,17 @@ public class ManagedTransportConnection extends TransportConnection {
}
}
public void doStop() throws Exception {
if (isStarting()) {
setPendingStop(true);
return;
@Override
public void stopAsync() {
if (!isStopping()) {
synchronized (this) {
unregisterMBean(byClientIdName);
unregisterMBean(byAddressName);
byClientIdName = null;
byAddressName = null;
}
}
synchronized (this) {
unregisterMBean(byClientIdName);
unregisterMBean(byAddressName);
byClientIdName = null;
byAddressName = null;
}
super.doStop();
super.stopAsync();
}
public Response processAddConnection(ConnectionInfo info) throws Exception {

View File

@ -236,17 +236,8 @@ public class RegionBroker extends EmptyBroker {
synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) {
if (context.isFaultTolerant() || context.isNetworkConnection()){
//remove the old connection
try{
removeConnection(oldContext, info, new Exception("remove stale client"));
}catch(Exception e){
LOG.warn("Failed to remove stale connection ",e);
}
}else{
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress());
}
} else {
clientIdSet.put(clientId, context);
}

View File

@ -16,12 +16,17 @@
*/
package org.apache.activemq;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,33 +36,64 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
protected Connection connection;
protected boolean transacted;
protected int authMode = Session.AUTO_ACKNOWLEDGE;
public boolean useFailover = false;
public static Test suite() {
return suite(ReconnectWithSameClientIDTest.class);
}
public void initCombosForTestReconnectMultipleTimesWithSameClientID() {
addCombinationValues("useFailover", new Object[]{Boolean.FALSE, Boolean.TRUE});
}
public void testReconnectMultipleTimesWithSameClientID() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
// now lets create another which should fail
for (int i = 1; i < 11; i++) {
Connection connection2 = connectionFactory.createConnection();
try {
useConnection(connection2);
fail("Should have thrown InvalidClientIDException on attempt" + i);
} catch (InvalidClientIDException e) {
connection2.close();
LOG.info("Caught expected: " + e);
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.jmx.ManagedTransportConnection.class);
final AtomicBoolean failed = new AtomicBoolean(false);
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getMessage().toString().startsWith("Failed to register MBean")) {
LOG.info("received unexpected log message: " + event.getMessage());
failed.set(true);
}
}
}
};
log4jLogger.addAppender(appender);
try {
connection = connectionFactory.createConnection();
useConnection(connection);
// now lets try closing the original connection and creating a new
// connection with the same ID
connection.close();
connection = connectionFactory.createConnection();
useConnection(connection);
// now lets create another which should fail
for (int i = 1; i < 11; i++) {
Connection connection2 = connectionFactory.createConnection();
try {
useConnection(connection2);
fail("Should have thrown InvalidClientIDException on attempt" + i);
} catch (InvalidClientIDException e) {
LOG.info("Caught expected: " + e);
} finally {
connection2.close();
}
}
// now lets try closing the original connection and creating a new
// connection with the same ID
connection.close();
connection = connectionFactory.createConnection();
useConnection(connection);
} finally {
log4jLogger.removeAppender(appender);
}
assertFalse("failed on unexpected log event", failed.get());
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") +
broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected void setUp() throws Exception {

View File

@ -48,7 +48,6 @@ public class AMQ2580Test extends TestSupport {
private Session session;
private MessageProducer producer;
private ConnectionFactory connectionFactory;
private TopicConnection topicConnection;
private BrokerService service;
public static Test suite() {
@ -195,8 +194,8 @@ public class AMQ2580Test extends TestSupport {
}
private void initTopic() throws JMSException {
topicConnection = (TopicConnection) connectionFactory.createConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
initConnection();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(TOPIC_NAME);
}
}

View File

@ -26,14 +26,14 @@ import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.ErrorHandler;
import org.apache.log4j.spi.Filter;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.LoggerFactory;
public class AMQ2902Test extends TestCase {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class);
final AtomicBoolean gotExceptionInLog = new AtomicBoolean(Boolean.FALSE);
final AtomicBoolean failedToFindMDC = new AtomicBoolean(Boolean.FALSE);
@ -42,6 +42,7 @@ public class AMQ2902Test extends TestCase {
public void doAppend(LoggingEvent event) {
if (event.getThrowableInformation() != null
&& event.getThrowableInformation().getThrowable() instanceof TransportDisposedIOException) {
LOG.error("got event: " + event + ", ex:" + event.getThrowableInformation().getThrowable(), event.getThrowableInformation().getThrowable());
gotExceptionInLog.set(Boolean.TRUE);
}
if (event.getMDC("activemq.broker") == null) {