ARTEMIS-4332 Add management method to close stuck server sessions

In rare cases a store operation could silently fails or starves, blocking the
related server session and all delivering messages. Those server sessions can
be closed adding a management method that cleans their operation context
before closing them.
This commit is contained in:
Domenico Francesco Bruscino 2023-06-24 09:43:41 +02:00 committed by clebertsuconic
parent b4230c62bf
commit 451d03fd75
8 changed files with 111 additions and 1 deletions

View File

@ -1240,6 +1240,14 @@ public interface ActiveMQServerControl {
boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID,
@Parameter(desc = "The session ID", name = "ID") String ID) throws Exception;
/**
* Closes the session with the given id.
*/
@Operation(desc = "Closes the session with the id", impact = MBeanOperationInfo.INFO)
boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = "connectionID") String connectionID,
@Parameter(desc = "The session ID", name = "ID") String ID,
@Parameter(desc = "Force session close cancelling pending tasks", name = "force") boolean force) throws Exception;
/**
* Closes the consumer with the given id.
*/

View File

@ -2381,6 +2381,11 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
return closeSessionWithID(connectionID, ID, false);
}
@Override
public boolean closeSessionWithID(final String connectionID, final String ID, final boolean force) throws Exception {
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
@ -2393,7 +2398,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
List<ServerSession> sessions = server.getSessions(connectionID);
for (ServerSession session : sessions) {
if (session.getName().equals(ID.toString())) {
session.close(true);
session.close(true, force);
return true;
}
}

View File

@ -61,4 +61,8 @@ public interface OperationContext extends IOCompletion {
* @throws Exception
*/
boolean waitCompletion(long timeout) throws Exception;
default void clear() {
}
}

View File

@ -447,4 +447,27 @@ public class OperationContextImpl implements OperationContext {
executorsPendingField +
"]";
}
@Override
public synchronized void clear() {
stored = 0;
storeLineUpField = 0;
minimalReplicated = 0;
replicated = 0;
replicationLineUpField = 0;
paged = 0;
minimalPage = 0;
pageLineUpField = 0;
errorCode = -1;
errorMessage = null;
executorsPendingField = 0;
if (tasks != null) {
tasks.clear();
}
if (storeOnlyTasks != null) {
storeOnlyTasks.clear();
}
}
}

View File

@ -372,6 +372,8 @@ public interface ServerSession extends SecurityAuth {
void close(boolean failed) throws Exception;
void close(boolean failed, boolean force) throws Exception;
void setTransferring(boolean transferring);
Set<ServerConsumer> getServerConsumers();

View File

@ -1707,9 +1707,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void close(final boolean failed) {
close(failed, false);
}
@Override
public void close(final boolean failed, final boolean force) {
if (closed)
return;
if (force) {
context.clear();
}
context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {

View File

@ -5805,6 +5805,60 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertTrue(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed());
}
@Test
public void testForceCloseSession() throws Exception {
testForceCloseSession(false, false);
}
@Test
public void testForceCloseSessionWithError() throws Exception {
testForceCloseSession(true, false);
}
@Test
public void testForceCloseSessionWithPendingStoreOperation() throws Exception {
testForceCloseSession(false, true);
}
private void testForceCloseSession(boolean error, boolean pendingStoreOperation) throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
boolean durable = true;
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
if (legacyCreateQueue) {
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
} else {
serverControl.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(durable).setAutoCreateAddress(false).toJSON());
}
ServerLocator receiveLocator = createInVMNonHALocator().setCallTimeout(500);
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
final ClientConsumer clientConsumer = receiveClientSession.createConsumer(name);
Assert.assertEquals(1, server.getSessions().size());
ServerSession serverSession = server.getSessions().iterator().next();
Assert.assertEquals(((ClientSessionImpl)receiveClientSession).getName(), serverSession.getName());
if (error) {
serverSession.getSessionContext().onError(0, "error");
}
if (pendingStoreOperation) {
serverSession.getSessionContext().storeLineUp();
}
serverControl.closeSessionWithID(serverSession.getConnectionID().toString(), serverSession.getName(), true);
Wait.assertTrue(() -> serverSession.getServerConsumers().size() == 0, 500);
Wait.assertTrue(() -> server.getSessions().size() == 0, 500);
}
@Test
public void testAddUser() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();

View File

@ -159,6 +159,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID);
}
@Override
public boolean closeSessionWithID(String connectionID, String ID, boolean force) throws Exception {
return (Boolean) proxy.invokeOperation("closeSessionWithID", connectionID, ID, force);
}
@Override
public boolean closeConsumerWithID(String sessionID, String ID) throws Exception {
return (Boolean) proxy.invokeOperation("closeConsumerWithID", sessionID, ID);