This commit is contained in:
Clebert Suconic 2018-06-15 10:42:31 -04:00
commit 13954d3509
2 changed files with 31 additions and 9 deletions

View File

@ -183,12 +183,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/
private ServerSession internalSession;
/**
* Used for proper closing of internal sessions like OpenWire advisory
* session at disconnect.
*/
private final Set<SessionId> internalSessionIds = new ConcurrentHashSet<>();
private final OperationContext operationContext;
private static final AtomicLongFieldUpdater<OpenWireConnection> LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent");
@ -616,8 +610,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
state.shutdown();
try {
for (SessionId sessionId : internalSessionIds) {
sessions.get(sessionId).close();
for (SessionId sessionId : sessionIdMap.values()) {
AMQSession session = sessions.get(sessionId);
if (session != null) {
session.close();
}
}
internalSession.close(false);
} catch (Exception e) {
@ -993,7 +990,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void addSessions(Set<SessionId> sessionSet) {
for (SessionId sid : sessionSet) {
addSession(getState().getSessionState(sid).getInfo(), true);
internalSessionIds.add(sid);
}
}
@ -1017,6 +1013,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
AMQSession session = sessions.remove(info.getSessionId());
if (session != null) {
sessionIdMap.remove(session.getCoreSession().getName());
session.close();
}
}

View File

@ -56,4 +56,29 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
@Test
public void testInternalSessionHandlingNoSessionClose() throws Exception {
try (Connection conn = factory.createConnection()) {
conn.start();
for (int i = 0; i < 100; i++) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE);
sendMessages(session, dest, 1);
MessageConsumer consumer = session.createConsumer(dest);
Message m = consumer.receive(2000);
consumer.close();
assertNotNull(m);
if (i % 2 == 1) {
// it will close only half of the sessions
// just to introduce error conditions
session.close();
}
}
}
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
}