ARTEMIS-2309 TempQueueCleanerUpper instances are leaking

The changes from ARTEMIS-2189 mean that
o.a.a.a.c.s.i.ServerSessionImpl#deleteQueue
is no longer called from the same ServerSessionImpl instance that
created it which means that TempQueueCleanerUpper instances will leak.
To resolve the leak the client will only create a new session when
necessary instead of every time delete() is invoked.
This commit is contained in:
Justin Bertram 2019-04-16 15:24:06 -05:00 committed by Clebert Suconic
parent fad7aa2c97
commit e0a7073884
3 changed files with 42 additions and 8 deletions

View File

@ -421,19 +421,31 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
public void delete() throws JMSException { public void delete() throws JMSException {
if (session != null) { if (session != null) {
/** boolean openedHere = false;
* The status of the session used to create the temporary destination is uncertain, but the JMS spec states ActiveMQSession sessionToUse = session;
* that the lifetime of the temporary destination is tied to the connection so even if the originating
* session is closed the temporary destination should still be deleted. Therefore, just create a new one if (session.getCoreSession().isClosed()) {
* and close it after the temporary destination is deleted. This is necessary because the Core API is sessionToUse = (ActiveMQSession)session.getConnection().createSession();
* predicated on having a Core ClientSession which is encapsulated by the JMS session implementation. openedHere = true;
*/ }
try (ActiveMQSession sessionToUse = (ActiveMQSession) session.getConnection().createSession()) {
try {
/**
* The status of the session used to create the temporary destination is uncertain, but the JMS spec states
* that the lifetime of the temporary destination is tied to the connection so even if the originating
* session is closed the temporary destination should still be deleted. Therefore, just create a new one
* and close it after the temporary destination is deleted. This is necessary because the Core API is
* predicated on having a Core ClientSession which is encapsulated by the JMS session implementation.
*/
if (isQueue()) { if (isQueue()) {
sessionToUse.deleteTemporaryQueue(this); sessionToUse.deleteTemporaryQueue(this);
} else { } else {
sessionToUse.deleteTemporaryTopic(this); sessionToUse.deleteTemporaryTopic(this);
} }
} finally {
if (openedHere) {
sessionToUse.close();
}
} }
} }
} }

View File

@ -291,6 +291,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.closeables.add(closeable); this.closeables.add(closeable);
} }
public Map<SimpleString, TempQueueCleanerUpper> getTempQueueCleanUppers() {
return tempQueueCleannerUppers;
}
@Override @Override
public Executor getSessionExecutor() { public Executor getSessionExecutor() {
return sessionExecutor; return sessionExecutor;

View File

@ -26,6 +26,8 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.tests.util.JMSTestBase;
@ -204,4 +206,20 @@ public class TemporaryDestinationTest extends JMSTestBase {
} }
} }
@Test
public void testForTempQueueCleanerUpperLeak() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
temporaryQueue.delete();
for (ServerSession serverSession : server.getSessions()) {
assertEquals(0, ((ServerSessionImpl)serverSession).getTempQueueCleanUppers().size());
}
} finally {
if (conn != null) {
conn.close();
}
}
}
} }