diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 0c4289fb52..95ef0c0db2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -1240,6 +1240,14 @@ public interface ActiveMQServerControl { boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID, @Parameter(desc = "The session ID", name = "ID") String ID) throws Exception; + /** + * Closes the session with the given id. + */ + @Operation(desc = "Closes the session with the id", impact = MBeanOperationInfo.INFO) + boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID, + @Parameter(desc = "The session ID", name = "ID") String ID, + @Parameter(desc = "Force session close cancelling pending tasks", name = "force") boolean force) throws Exception; + /** * Closes the consumer with the given id. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 1096ebee4f..bec69e4398 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -2381,6 +2381,11 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception { + return closeSessionWithID(connectionID, ID, false); + } + + @Override + public boolean closeSessionWithID(final String connectionID, final String ID, final boolean force) throws Exception { // possibly a long running task try (AutoCloseable lock = server.managementLock()) { if (AuditLogger.isBaseLoggingEnabled()) { @@ -2393,7 +2398,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active List sessions = server.getSessions(connectionID); for (ServerSession session : sessions) { if (session.getName().equals(ID.toString())) { - session.close(true); + session.close(true, force); return true; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java index 104a79c257..57ba7eca7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java @@ -61,4 +61,8 @@ public interface OperationContext extends IOCompletion { * @throws Exception */ boolean waitCompletion(long timeout) throws Exception; + + default void clear() { + + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 0a17c8d2d8..ceab9c0c1f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -447,4 +447,27 @@ public class OperationContextImpl implements OperationContext { executorsPendingField + "]"; } + + @Override + public synchronized void clear() { + stored = 0; + storeLineUpField = 0; + minimalReplicated = 0; + replicated = 0; + replicationLineUpField = 0; + paged = 0; + minimalPage = 0; + pageLineUpField = 0; + errorCode = -1; + errorMessage = null; + executorsPendingField = 0; + + if (tasks != null) { + tasks.clear(); + } + + if (storeOnlyTasks != null) { + storeOnlyTasks.clear(); + } + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index d02851ee8f..9357138b1f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -372,6 +372,8 @@ public interface ServerSession extends SecurityAuth { void close(boolean failed) throws Exception; + void close(boolean failed, boolean force) throws Exception; + void setTransferring(boolean transferring); Set getServerConsumers(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 3284654049..0833fd81ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1707,9 +1707,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void close(final boolean failed) { + close(failed, false); + } + + @Override + public void close(final boolean failed, final boolean force) { if (closed) return; + if (force) { + context.clear(); + } + context.executeOnCompletion(new IOCallback() { @Override public void onError(int errorCode, String errorMessage) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index b6585dbcc4..9f2f4be22d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -5805,6 +5805,60 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertTrue(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed()); } + @Test + public void testForceCloseSession() throws Exception { + testForceCloseSession(false, false); + } + + @Test + public void testForceCloseSessionWithError() throws Exception { + testForceCloseSession(true, false); + } + + @Test + public void testForceCloseSessionWithPendingStoreOperation() throws Exception { + testForceCloseSession(false, true); + } + + private void testForceCloseSession(boolean error, boolean pendingStoreOperation) throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString name = RandomUtil.randomSimpleString(); + boolean durable = true; + + ActiveMQServerControl serverControl = createManagementControl(); + + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + if (legacyCreateQueue) { + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false); + } else { + serverControl.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(durable).setAutoCreateAddress(false).toJSON()); + } + + ServerLocator receiveLocator = createInVMNonHALocator().setCallTimeout(500); + ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); + ClientSession receiveClientSession = receiveCsf.createSession(true, false, false); + final ClientConsumer clientConsumer = receiveClientSession.createConsumer(name); + + Assert.assertEquals(1, server.getSessions().size()); + + ServerSession serverSession = server.getSessions().iterator().next(); + Assert.assertEquals(((ClientSessionImpl)receiveClientSession).getName(), serverSession.getName()); + + if (error) { + serverSession.getSessionContext().onError(0, "error"); + } + + if (pendingStoreOperation) { + serverSession.getSessionContext().storeLineUp(); + } + + serverControl.closeSessionWithID(serverSession.getConnectionID().toString(), serverSession.getName(), true); + + Wait.assertTrue(() -> serverSession.getServerConsumers().size() == 0, 500); + Wait.assertTrue(() -> server.getSessions().size() == 0, 500); + } + @Test public void testAddUser() throws Exception { ActiveMQServerControl serverControl = createManagementControl(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index fd1fee8be2..8ae3599eaa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -159,6 +159,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID); } + @Override + public boolean closeSessionWithID(String connectionID, String ID, boolean force) throws Exception { + return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID, force); + } + @Override public boolean closeConsumerWithID(String sessionID, String ID) throws Exception { return (Boolean) proxy.invokeOperation("closeConsumerWithID", sessionID, ID);