This commit is contained in:
Clebert Suconic 2018-05-03 12:23:58 -04:00
commit 75c92a364d
4 changed files with 74 additions and 11 deletions

View File

@ -82,7 +82,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -609,6 +608,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// is it necessary? even, do we need state at all?
state.shutdown();
try {
internalSession.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
// Then call the listeners
// this should closes underlying sessions
callFailureListeners(me);
@ -719,13 +724,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
info.setClientIp(getRemoteAddress());
}
createInternalSession(info);
createInternalSession();
return context;
}
private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes());
private void createInternalSession() throws Exception {
SessionInfo sessionInfo = getState().getSessionStates().iterator().next().getInfo();
AMQSession session = addSession(sessionInfo, true);
internalSession = session.getCoreSession();
}
//raise the refCount of context

View File

@ -320,6 +320,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
oldConnection.disconnect(true);
connections.remove(oldConnection);
connection.reconnect(context, info);
// init the conn after reconnect
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
} else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
}
@ -336,9 +339,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
ConnectionInfo copy = info.copy();
copy.setPassword("");
fireAdvisory(context, topic, copy);
// init the conn
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
}
}

View File

@ -161,7 +161,7 @@
<dependency>
<groupId>org.apache.activemq.rest</groupId>
<artifactId>artemis-rest</artifactId>
<version>2.6.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
@ -264,7 +264,6 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid.jms.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
@ -273,7 +272,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
@ -348,7 +346,6 @@
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<version>${jetty.version}</version>
<type>jar</type>
<classifier>uber</classifier>
</dependency>

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.AfterClass;
import static org.junit.Assert.assertNotNull;
import org.junit.BeforeClass;
import org.junit.Test;
public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
@BeforeClass
public static void prepareLogger() {
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
}
@Test
public void testInternalSessionHandling() throws Exception {
try (Connection conn = factory.createConnection()) {
conn.start();
try (Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE);
sendMessages(session, dest, 1);
MessageConsumer consumer = session.createConsumer(dest);
Message m = consumer.receive(2000);
assertNotNull(m);
}
}
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
}