This closes #1675
This commit is contained in:
commit
328cf5cc59
|
@ -45,7 +45,23 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
|
||||||
@Override
|
@Override
|
||||||
public JsonObjectBuilder toJson(ServerConsumer consumer) {
|
public JsonObjectBuilder toJson(ServerConsumer consumer) {
|
||||||
ServerSession session = server.getSessionByID(consumer.getSessionID());
|
ServerSession session = server.getSessionByID(consumer.getSessionID());
|
||||||
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())).add("sessionName", toString(consumer.getSessionName())).add("connectionClientID", toString(consumer.getConnectionClientID())).add("user", toString(session.getUsername())).add("connectionProtocolName", toString(consumer.getConnectionProtocolName())).add("queueName", toString(consumer.getQueueName())).add("queueType", toString(consumer.getQueueType()).toLowerCase()).add("queueAddress", toString(consumer.getQueueAddress().toString())).add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())).add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString());
|
|
||||||
|
//if session is not available then consumer is not in valid state - ignore
|
||||||
|
if (session == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID()))
|
||||||
|
.add("sessionName", toString(consumer.getSessionName()))
|
||||||
|
.add("connectionClientID", toString(consumer.getConnectionClientID()))
|
||||||
|
.add("user", toString(session.getUsername()))
|
||||||
|
.add("connectionProtocolName", toString(consumer.getConnectionProtocolName()))
|
||||||
|
.add("queueName", toString(consumer.getQueueName()))
|
||||||
|
.add("queueType", toString(consumer.getQueueType()).toLowerCase())
|
||||||
|
.add("queueAddress", toString(consumer.getQueueAddress().toString()))
|
||||||
|
.add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress()))
|
||||||
|
.add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress()))
|
||||||
|
.add("creationTime", new Date(consumer.getCreationTime()).toString());
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,9 +27,15 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
|
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
||||||
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
|
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRule;
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
|
@ -57,8 +63,7 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BMRules(rules = {@BMRule(
|
@BMRules(rules = {@BMRule(name = "Delay listAddress() by 2 secs ",
|
||||||
name = "Delay listAddress() by 2 secs ",
|
|
||||||
targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ",
|
targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ",
|
||||||
targetMethod = "<init>(org.apache.activemq.artemis.core.server.ActiveMQServer)",
|
targetMethod = "<init>(org.apache.activemq.artemis.core.server.ActiveMQServer)",
|
||||||
targetLocation = "ENTRY",
|
targetLocation = "ENTRY",
|
||||||
|
@ -109,6 +114,90 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aim: verify that no exceptions will occur when a session is closed during listConsumers() operation
|
||||||
|
*
|
||||||
|
* test delays the listConsumer() BEFORE the Session information associated with the consumer is retrieved.
|
||||||
|
* During this delay the client session is closed.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BMRules(rules = {@BMRule(name = "Delay listConsumers() by 2 secs ",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.management.impl.view.ConsumerView",
|
||||||
|
targetMethod = "toJson(org.apache.activemq.artemis.core.server.ServerConsumer)",
|
||||||
|
targetLocation = "ENTRY",
|
||||||
|
action = "org.apache.activemq.artemis.tests.extras.byteman.ActiveMQServerControlMultiThreadTest.delay(2)")})
|
||||||
|
|
||||||
|
public void listConsumersDuringSessionClose() throws Exception {
|
||||||
|
|
||||||
|
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||||
|
SimpleString addressName1 = new SimpleString("MyAddress_one");
|
||||||
|
SimpleString queueName1 = new SimpleString("my_queue_one");
|
||||||
|
|
||||||
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
|
|
||||||
|
server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
|
||||||
|
server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);
|
||||||
|
|
||||||
|
// create a consumer
|
||||||
|
try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);
|
||||||
|
ClientSession session = csf.createSession()) {
|
||||||
|
|
||||||
|
ClientConsumer consumer1_q1 = session.createConsumer(queueName1);
|
||||||
|
|
||||||
|
// add another consumer (on separate session)
|
||||||
|
ClientSession session_two = csf.createSession();
|
||||||
|
ClientConsumer consumer2_q1 = session_two.createConsumer(queueName1);
|
||||||
|
|
||||||
|
//first(normal) invocation - ensure 2 consumers returned
|
||||||
|
//used to block thread, until the delay() has been called.
|
||||||
|
delayCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
|
String consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
|
||||||
|
|
||||||
|
JsonObject consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
|
||||||
|
JsonArray consumersArray = (JsonArray) consumersAsJsonObject.get("data");
|
||||||
|
|
||||||
|
Assert.assertEquals("number of consumers returned from query", 2, consumersArray.size());
|
||||||
|
Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
|
||||||
|
Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
|
||||||
|
|
||||||
|
//second invocation - close session during listConsumers()
|
||||||
|
|
||||||
|
//used to block thread, until the delay() has been called.
|
||||||
|
delayCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
|
executorService.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
//wait until the delay occurs and close the session.
|
||||||
|
delayCalled.await();
|
||||||
|
session.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
|
||||||
|
|
||||||
|
consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
|
||||||
|
consumersArray = (JsonArray) consumersAsJsonObject.get("data");
|
||||||
|
|
||||||
|
// session is closed before Json string is created - should only be one consumer returned
|
||||||
|
Assert.assertEquals("number of consumers returned from query", 1, consumersArray.size());
|
||||||
|
Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
|
||||||
|
Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//notify delay has been called and wait for X seconds
|
//notify delay has been called and wait for X seconds
|
||||||
public static void delay(int seconds) {
|
public static void delay(int seconds) {
|
||||||
delayCalled.countDown();
|
delayCalled.countDown();
|
||||||
|
|
Loading…
Reference in New Issue