diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java index 95a803c93d..23310f85a4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java @@ -57,6 +57,8 @@ public class ConnectionState { sessions.clear(); tempDestinations.clear(); shutdown.set(false); + // Add the default session id. + addSession(new SessionInfo(info, -1)); } public void addTempDestination(DestinationInfo info) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java new file mode 100644 index 0000000000..0c791fdda0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SessionId; + +public class ConcurrentConnectSimulationTest extends BrokerTestSupport { + + /* + * simulate failover and retry of connection before broker has killed connection + * which appears as a concurrent connect request to the broker + * see: https://issues.apache.org/activemq/browse/AMQ-2241 + */ + public void testConcurrentConnection() throws Exception { + + StubConnection connection1 = createConnection(); + StubConnection connection2 = createConnection(); + + // reuse same connection info + ConnectionInfo connectionInfo = createConnectionInfo(); + connection1.request(connectionInfo); + connection2.request(connectionInfo); + + // second one should win out, verify using consumer on default session (watchAdvisories) + ConsumerId consumerId = new ConsumerId(new SessionId(connectionInfo.getConnectionId(), -1), 1); + ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); + consumerInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); + + connection2.request(consumerInfo); + } + + public static Test suite() { + return suite(ConcurrentConnectSimulationTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } +}