diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 2ee8759b99..a1293b70bd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; +import java.util.*; import java.util.List; import org.apache.activemq.Service; @@ -61,6 +61,8 @@ import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.InvalidClientIDException; + import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; @@ -447,12 +449,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C public Response processAddConnection(ConnectionInfo info) throws Throwable { // Setup the context. + String clientId = info.getClientId(); ConnectionContext context = new ConnectionContext(); context.setConnection(this); context.setBroker(broker); context.setConnector(connector); context.setTransactions(new ConcurrentHashMap()); - String clientId = info.getClientId(); context.setClientId(clientId); context.setUserName(info.getUserName()); context.setConnectionId(info.getConnectionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 93198a43ca..af490ead7e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -40,10 +40,13 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; +import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import java.io.IOException; import java.util.ArrayList; +import java.util.*; +import java.util.Set; /** * Routes Broker operations to the correct messaging regions for processing. @@ -67,6 +70,7 @@ public class RegionBroker implements Broker { private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private BrokerId brokerId; private String brokerName; + private Map clientIdSet = new HashMap(); // we will synchronize access public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException { this(taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null); @@ -110,10 +114,35 @@ public class RegionBroker implements Broker { } public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable { + String clientId = info.getClientId(); + if (clientId == null) { + throw new InvalidClientIDException("No clientID specified for connection request"); + } + synchronized (clientIdSet ) { + if (clientIdSet.containsKey(clientId)) { + throw new InvalidClientIDException("Client: " + clientId + " already connected"); + } + else { + clientIdSet.put(clientId, info); + } + } + connections.add(context.getConnection()); } public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable { + String clientId = info.getClientId(); + if (clientId == null) { + throw new InvalidClientIDException("No clientID specified for connection disconnect request"); + } + synchronized (clientIdSet) { + ConnectionInfo oldValue = (ConnectionInfo) clientIdSet.get(clientId); + // we may be removing the duplicate connection, not the first connection to be created + if (oldValue == info) { + clientIdSet.remove(clientId); + } + } + connections.remove(context.getConnection()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java new file mode 100644 index 0000000000..9921d6d638 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java @@ -0,0 +1,79 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.InvalidClientIDException; +import javax.jms.JMSException; +import javax.jms.Session; + +/** + * + * @version $Revision$ + */ +public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + + public void testReconnectMultipleTimesWithSameClientID() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + // now lets create another which should fail + for (int i = 1; i < 11; i++) { + Connection connection2 = connectionFactory.createConnection(); + try { + useConnection(connection2); + fail("Should have thrown InvalidClientIDException on attempt" + i); + } + catch (InvalidClientIDException e) { + connection2.close(); + System.out.println("Caught expected: " + e); + } + } + + // now lets try closing the original connection and creating a new connection with the same ID + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected void useConnection(Connection connection) throws JMSException { + connection.setClientID("foo"); + connection.start(); + /** + * Session session = connection.createSession(transacted, authMode); + * return session; + */ + } +}