From 7d61969795a9b55126818233903811bf6a1c30de Mon Sep 17 00:00:00 2001 From: Pat Fox Date: Sun, 26 Nov 2017 15:00:23 +0100 Subject: [PATCH] ARTEMIS-1526 race condition between listConsumers() and closing a Session. When session not found, ignore that consumer and continue. --- .../management/impl/view/ConsumerView.java | 18 +++- .../ActiveMQServerControlMultiThreadTest.java | 93 ++++++++++++++++++- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java index a54e40b704..386425a1d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java @@ -45,7 +45,23 @@ public class ConsumerView extends ActiveMQAbstractView { @Override public JsonObjectBuilder toJson(ServerConsumer consumer) { ServerSession session = server.getSessionByID(consumer.getSessionID()); - JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())).add("sessionName", toString(consumer.getSessionName())).add("connectionClientID", toString(consumer.getConnectionClientID())).add("user", toString(session.getUsername())).add("connectionProtocolName", toString(consumer.getConnectionProtocolName())).add("queueName", toString(consumer.getQueueName())).add("queueType", toString(consumer.getQueueType()).toLowerCase()).add("queueAddress", toString(consumer.getQueueAddress().toString())).add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())).add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString()); + + //if session is not available then consumer is not in valid state - ignore + if (session == null) { + return null; + } + + JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())) + .add("sessionName", toString(consumer.getSessionName())) + .add("connectionClientID", toString(consumer.getConnectionClientID())) + .add("user", toString(session.getUsername())) + .add("connectionProtocolName", toString(consumer.getConnectionProtocolName())) + .add("queueName", toString(consumer.getQueueName())) + .add("queueType", toString(consumer.getQueueType()).toLowerCase()) + .add("queueAddress", toString(consumer.getQueueAddress().toString())) + .add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())) + .add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())) + .add("creationTime", new Date(consumer.getCreationTime()).toString()); return obj; } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java index 2735d04d24..6f4b886ca9 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java @@ -27,9 +27,15 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; @@ -57,8 +63,7 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase { */ @Test - @BMRules(rules = {@BMRule( - name = "Delay listAddress() by 2 secs ", + @BMRules(rules = {@BMRule(name = "Delay listAddress() by 2 secs ", targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ", targetMethod = "(org.apache.activemq.artemis.core.server.ActiveMQServer)", targetLocation = "ENTRY", @@ -109,6 +114,90 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase { } } + /** + * Aim: verify that no exceptions will occur when a session is closed during listConsumers() operation + * + * test delays the listConsumer() BEFORE the Session information associated with the consumer is retrieved. + * During this delay the client session is closed. + * + * @throws Exception + */ + + @Test + @BMRules(rules = {@BMRule(name = "Delay listConsumers() by 2 secs ", + targetClass = "org.apache.activemq.artemis.core.management.impl.view.ConsumerView", + targetMethod = "toJson(org.apache.activemq.artemis.core.server.ServerConsumer)", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.ActiveMQServerControlMultiThreadTest.delay(2)")}) + + public void listConsumersDuringSessionClose() throws Exception { + + ExecutorService executorService = Executors.newFixedThreadPool(1); + SimpleString addressName1 = new SimpleString("MyAddress_one"); + SimpleString queueName1 = new SimpleString("my_queue_one"); + + ActiveMQServerControl serverControl = createManagementControl(); + + server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST)); + server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false); + + // create a consumer + try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator); + ClientSession session = csf.createSession()) { + + ClientConsumer consumer1_q1 = session.createConsumer(queueName1); + + // add another consumer (on separate session) + ClientSession session_two = csf.createSession(); + ClientConsumer consumer2_q1 = session_two.createConsumer(queueName1); + + //first(normal) invocation - ensure 2 consumers returned + //used to block thread, until the delay() has been called. + delayCalled = new CountDownLatch(1); + + String consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10); + + JsonObject consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString); + JsonArray consumersArray = (JsonArray) consumersAsJsonObject.get("data"); + + Assert.assertEquals("number of consumers returned from query", 2, consumersArray.size()); + Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName")); + Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName")); + + //second invocation - close session during listConsumers() + + //used to block thread, until the delay() has been called. + delayCalled = new CountDownLatch(1); + + executorService.submit(new Runnable() { + @Override + public void run() { + try { + //wait until the delay occurs and close the session. + delayCalled.await(); + session.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + }); + + consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10); + + consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString); + consumersArray = (JsonArray) consumersAsJsonObject.get("data"); + + // session is closed before Json string is created - should only be one consumer returned + Assert.assertEquals("number of consumers returned from query", 1, consumersArray.size()); + Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName")); + Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName")); + + } finally { + executorService.shutdown(); + } + } + //notify delay has been called and wait for X seconds public static void delay(int seconds) { delayCalled.countDown();