diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 69d900e671..e4d0a19315 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -82,7 +82,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -609,6 +608,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se // is it necessary? even, do we need state at all? state.shutdown(); + try { + internalSession.close(false); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + // Then call the listeners // this should closes underlying sessions callFailureListeners(me); @@ -719,13 +724,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se info.setClientIp(getRemoteAddress()); } - createInternalSession(info); + createInternalSession(); return context; } - private void createInternalSession(ConnectionInfo info) throws Exception { - internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes()); + private void createInternalSession() throws Exception { + SessionInfo sessionInfo = getState().getSessionStates().iterator().next().getInfo(); + AMQSession session = addSession(sessionInfo, true); + internalSession = session.getCoreSession(); } //raise the refCount of context diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 95a400e5f9..e95493754a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -320,6 +320,9 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl oldConnection.disconnect(true); connections.remove(oldConnection); connection.reconnect(context, info); + + // init the conn after reconnect + context.getConnection().addSessions(context.getConnectionState().getSessionIds()); } else { throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress()); } @@ -336,9 +339,6 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl ConnectionInfo copy = info.copy(); copy.setPassword(""); fireAdvisory(context, topic, copy); - - // init the conn - context.getConnection().addSessions(context.getConnectionState().getSessionIds()); } } diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index df735fd952..98d67ce926 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -161,7 +161,7 @@ org.apache.activemq.rest artemis-rest - 2.6.0-SNAPSHOT + ${project.version} org.apache.activemq @@ -264,7 +264,6 @@ org.apache.qpid qpid-jms-client - ${qpid.jms.version} org.apache.qpid @@ -273,7 +272,6 @@ org.slf4j slf4j-api - 1.7.5 org.apache.activemq @@ -348,7 +346,6 @@ org.eclipse.jetty.aggregate jetty-all - ${jetty.version} jar uber diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java new file mode 100644 index 0000000000..9c476a9f88 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.AfterClass; +import static org.junit.Assert.assertNotNull; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SessionHandlingOpenWireTest extends BasicOpenWireTest { + + @BeforeClass + public static void prepareLogger() { + AssertionLoggerHandler.startCapture(); + } + + @AfterClass + public static void clearLogger() { + AssertionLoggerHandler.stopCapture(); + } + + @Test + public void testInternalSessionHandling() throws Exception { + try (Connection conn = factory.createConnection()) { + conn.start(); + try (Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE); + sendMessages(session, dest, 1); + MessageConsumer consumer = session.createConsumer(dest); + Message m = consumer.receive(2000); + assertNotNull(m); + } + } + assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session")); + assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session")); + } +}