ARTEMIS-1768: Fix handling of internalSession for OpenWireConnection
This commit is contained in:
parent
6c7843f0a6
commit
08fd2acb24
|
@ -82,7 +82,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
@ -609,6 +608,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
// is it necessary? even, do we need state at all?
|
// is it necessary? even, do we need state at all?
|
||||||
state.shutdown();
|
state.shutdown();
|
||||||
|
|
||||||
|
try {
|
||||||
|
internalSession.close(false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
// Then call the listeners
|
// Then call the listeners
|
||||||
// this should closes underlying sessions
|
// this should closes underlying sessions
|
||||||
callFailureListeners(me);
|
callFailureListeners(me);
|
||||||
|
@ -719,13 +724,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
info.setClientIp(getRemoteAddress());
|
info.setClientIp(getRemoteAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
createInternalSession(info);
|
createInternalSession();
|
||||||
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createInternalSession(ConnectionInfo info) throws Exception {
|
private void createInternalSession() throws Exception {
|
||||||
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes());
|
SessionInfo sessionInfo = getState().getSessionStates().iterator().next().getInfo();
|
||||||
|
AMQSession session = addSession(sessionInfo, true);
|
||||||
|
internalSession = session.getCoreSession();
|
||||||
}
|
}
|
||||||
|
|
||||||
//raise the refCount of context
|
//raise the refCount of context
|
||||||
|
|
|
@ -320,6 +320,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
oldConnection.disconnect(true);
|
oldConnection.disconnect(true);
|
||||||
connections.remove(oldConnection);
|
connections.remove(oldConnection);
|
||||||
connection.reconnect(context, info);
|
connection.reconnect(context, info);
|
||||||
|
|
||||||
|
// init the conn after reconnect
|
||||||
|
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
|
||||||
} else {
|
} else {
|
||||||
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
|
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
|
||||||
}
|
}
|
||||||
|
@ -336,9 +339,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
ConnectionInfo copy = info.copy();
|
ConnectionInfo copy = info.copy();
|
||||||
copy.setPassword("");
|
copy.setPassword("");
|
||||||
fireAdvisory(context, topic, copy);
|
fireAdvisory(context, topic, copy);
|
||||||
|
|
||||||
// init the conn
|
|
||||||
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,7 +161,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq.rest</groupId>
|
<groupId>org.apache.activemq.rest</groupId>
|
||||||
<artifactId>artemis-rest</artifactId>
|
<artifactId>artemis-rest</artifactId>
|
||||||
<version>2.6.0-SNAPSHOT</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
@ -264,7 +264,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.qpid</groupId>
|
||||||
<artifactId>qpid-jms-client</artifactId>
|
<artifactId>qpid-jms-client</artifactId>
|
||||||
<version>${qpid.jms.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.qpid</groupId>
|
<groupId>org.apache.qpid</groupId>
|
||||||
|
@ -273,7 +272,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
<artifactId>slf4j-api</artifactId>
|
<artifactId>slf4j-api</artifactId>
|
||||||
<version>1.7.5</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
@ -348,7 +346,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.jetty.aggregate</groupId>
|
<groupId>org.eclipse.jetty.aggregate</groupId>
|
||||||
<artifactId>jetty-all</artifactId>
|
<artifactId>jetty-all</artifactId>
|
||||||
<version>${jetty.version}</version>
|
|
||||||
<type>jar</type>
|
<type>jar</type>
|
||||||
<classifier>uber</classifier>
|
<classifier>uber</classifier>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void prepareLogger() {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void clearLogger() {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInternalSessionHandling() throws Exception {
|
||||||
|
try (Connection conn = factory.createConnection()) {
|
||||||
|
conn.start();
|
||||||
|
try (Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||||
|
Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE);
|
||||||
|
sendMessages(session, dest, 1);
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
Message m = consumer.receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
|
||||||
|
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue