This closes #677

This commit is contained in:
Clebert Suconic 2016-07-28 11:33:47 -04:00
commit 9a9f23e8a5
12 changed files with 151 additions and 199 deletions

View File

@ -295,6 +295,13 @@ public final class JsonUtil {
return jsonValue.toString();
}
}
else if (jsonValue instanceof Object[]) {
Object[] array = (Object[]) jsonValue;
for (int i = 0; i < array.length; i++) {
array[i] = convertJsonValue(array[i], desiredType);
}
return array;
}
else {
return jsonValue;
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
* The ClientRequestor constructor is given a ClientSession and a request address.
* It creates a temporary queue for the responses and provides a request method that sends the request message and waits for its reply.
*/
public final class ClientRequestor {
public final class ClientRequestor implements AutoCloseable {
private final ClientSession queueSession;

View File

@ -16,15 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import java.util.Map;
import org.apache.activemq.artemis.api.core.management.AcceptorControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.Test;
import java.util.Map;
public class AcceptorControlUsingCoreTest extends AcceptorControlTest {
// Constants -----------------------------------------------------
@ -37,20 +34,11 @@ public class AcceptorControlUsingCoreTest extends AcceptorControlTest {
// AcceptorControlTest overrides --------------------------------
private ClientSession session;
@Override
protected AcceptorControl createManagementControl(final String name) throws Exception {
ServerLocator locator = createInVMNonHALocator();
addServerLocator(locator);
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
addClientSession(session);
session.start();
return new AcceptorControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_ACCEPTOR + name);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_ACCEPTOR + name);
@Override
public String getFactoryClassName() {

View File

@ -81,6 +81,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
private static boolean contains(final String name, final String[] strings) {
boolean found = false;
for (String str : strings) {
IntegrationTestLogger.LOGGER.info("Does " + str + " match " + name);
if (name.equals(str)) {
found = true;
break;
@ -900,7 +901,14 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
@Test
public void testForceFailover() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
serverControl.forceFailover();
try {
serverControl.forceFailover();
}
catch (Exception e) {
if (!usingCore()) {
fail(e.getMessage());
}
}
assertFalse(server.isStarted());
}
@ -948,7 +956,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
assertEquals(CONNECTION_COUNT + (usingCore() ? 1 : 0), serverControl.getTotalConnectionCount());
assertEquals(0 + (usingCore() ? 1 : 0), serverControl.getConnectionCount());
assertEquals((usingCore() ? 1 : 0), serverControl.getConnectionCount());
locator.close();
}
@ -986,7 +994,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
session.commit();
assertEquals(2, serverControl.getTotalMessagesAdded());
assertEquals(0 + (usingCore() ? 1 : 0), serverControl.getTotalMessageCount());
assertEquals(0, serverControl.getTotalMessageCount());
consumer1.close();
consumer2.close();
@ -1032,7 +1040,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
session.commit();
assertEquals(2, serverControl.getTotalMessagesAcknowledged());
assertEquals(0 + (usingCore() ? 1 : 0), serverControl.getTotalMessageCount());
assertEquals(0, serverControl.getTotalMessageCount());
consumer1.close();
consumer2.close();
@ -1064,7 +1072,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ClientConsumer consumer1 = session.createConsumer(random1);
ClientConsumer consumer2 = session.createConsumer(random2);
assertEquals(2 + (usingCore() ? 1 : 0), serverControl.getTotalConsumerCount());
assertEquals(usingCore() ? 3 : 2, serverControl.getTotalConsumerCount());
assertEquals(1, queueControl1.getConsumerCount());
assertEquals(1, queueControl2.getConsumerCount());
@ -1086,6 +1094,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator = createInVMNonHALocator();
factories.add(createSessionFactory(locator));
Thread.sleep(200);
factories.add(createSessionFactory(locator));
addClientSession(factories.get(1).createSession());
@ -1093,17 +1102,34 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
IntegrationTestLogger.LOGGER.info(jsonString);
Assert.assertNotNull(jsonString);
JsonArray array = JsonUtil.readJsonArray(jsonString);
Assert.assertEquals(2, array.size());
JsonObject first;
JsonObject second;
if (array.getJsonObject(0).getJsonNumber("creationTime").longValue() < array.getJsonObject(1).getJsonNumber("creationTime").longValue()) {
first = array.getJsonObject(0);
second = array.getJsonObject(1);
Assert.assertEquals(usingCore() ? 3 : 2, array.size());
String key = "creationTime";
JsonObject[] sorted = new JsonObject[array.size()];
for (int i = 0; i < array.size(); i++) {
sorted[i] = array.getJsonObject(i);
}
else {
first = array.getJsonObject(1);
second = array.getJsonObject(0);
if (sorted[0].getJsonNumber(key).longValue() > sorted[1].getJsonNumber(key).longValue()) {
JsonObject o = sorted[1];
sorted[1] = sorted[0];
sorted[0] = o;
}
if (usingCore()) {
if (sorted[1].getJsonNumber(key).longValue() > sorted[2].getJsonNumber(key).longValue()) {
JsonObject o = sorted[2];
sorted[2] = sorted[1];
sorted[1] = o;
}
if (sorted[0].getJsonNumber(key).longValue() > sorted[1].getJsonNumber(key).longValue()) {
JsonObject o = sorted[1];
sorted[1] = sorted[0];
sorted[0] = o;
}
}
JsonObject first = sorted[0];
JsonObject second = sorted[1];
Assert.assertTrue(first.getString("connectionID").length() > 0);
Assert.assertTrue(first.getString("clientAddress").length() > 0);
@ -1186,23 +1212,41 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
server.createQueue(queueName, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(200);
addClientConsumer(session2.createConsumer(queueName));
String jsonString = serverControl.listAllConsumersAsJSON();
IntegrationTestLogger.LOGGER.info(jsonString);
Assert.assertNotNull(jsonString);
JsonArray array = JsonUtil.readJsonArray(jsonString);
Assert.assertEquals(2, array.size());
JsonObject first;
JsonObject second;
if (array.getJsonObject(0).getJsonNumber("creationTime").longValue() < array.getJsonObject(1).getJsonNumber("creationTime").longValue()) {
first = array.getJsonObject(0);
second = array.getJsonObject(1);
Assert.assertEquals(usingCore() ? 3 : 2, array.size());
String key = "creationTime";
JsonObject[] sorted = new JsonObject[array.size()];
for (int i = 0; i < array.size(); i++) {
sorted[i] = array.getJsonObject(i);
}
else {
first = array.getJsonObject(1);
second = array.getJsonObject(0);
if (sorted[0].getJsonNumber(key).longValue() > sorted[1].getJsonNumber(key).longValue()) {
JsonObject o = sorted[1];
sorted[1] = sorted[0];
sorted[0] = o;
}
if (usingCore()) {
if (sorted[1].getJsonNumber(key).longValue() > sorted[2].getJsonNumber(key).longValue()) {
JsonObject o = sorted[2];
sorted[2] = sorted[1];
sorted[1] = o;
}
if (sorted[0].getJsonNumber(key).longValue() > sorted[1].getJsonNumber(key).longValue()) {
JsonObject o = sorted[1];
sorted[1] = sorted[0];
sorted[0] = o;
}
}
JsonObject first = sorted[0];
JsonObject second = sorted[1];
Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());

View File

@ -16,13 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.Before;
public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
@ -46,33 +42,6 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
// ActiveMQServerControlTest overrides --------------------------
private ClientSession session;
private ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
}
@Override
protected void restartServer() throws Exception {
session.close();
super.restartServer();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
}
// the core messaging proxy doesn't work when the server is stopped so we cant run these 2 tests
@Override
public void testScaleDownWithOutConnector() throws Exception {
@ -95,7 +64,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
throw new UnsupportedOperationException();
}
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_SERVER);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_SERVER);
@Override
public boolean isSharedStore() {
@ -225,7 +194,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public String[] getQueueNames() {
return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("queueNames"));
return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("queueNames", String.class));
}
@Override
@ -304,7 +273,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public int getMessageCounterMaxDayCount() {
return (Integer) proxy.retrieveAttributeValue("messageCounterMaxDayCount");
return (Integer) proxy.retrieveAttributeValue("messageCounterMaxDayCount", Integer.class);
}
@Override
@ -708,7 +677,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public String listConsumersAsJSON(String connectionID) throws Exception {
return (String) proxy.invokeOperation("listConsumersAsJSON");
return (String) proxy.invokeOperation("listConsumersAsJSON", connectionID);
}
@Override
@ -723,7 +692,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public String listSessionsAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception {
return (String) proxy.invokeOperation("listSessionsAsJSON");
return (String) proxy.invokeOperation("listSessionsAsJSON", connectionID);
}
};
}

View File

@ -44,6 +44,10 @@ public class AddressControlUsingCoreTest extends ManagementTestBase {
private ActiveMQServer server;
private ServerLocator locator;
private ClientSessionFactory sf;
protected ClientSession session;
// Static --------------------------------------------------------
@ -165,14 +169,14 @@ public class AddressControlUsingCoreTest extends ManagementTestBase {
server.setMBeanServer(mbeanServer);
server.start();
ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
ClientSessionFactory sf = createSessionFactory(locator);
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
sf = createSessionFactory(locator);
session = sf.createSession(false, true, false);
session.start();
}
protected CoreMessagingProxy createProxy(final SimpleString address) throws Exception {
CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_ADDRESS + address);
CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_ADDRESS + address);
return proxy;
}

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.management.MBeanServerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -35,12 +38,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.management.MBeanServerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BridgeControlUsingCoreTest extends ManagementTestBase {
// Constants -----------------------------------------------------
@ -53,8 +50,6 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
private ActiveMQServer server_1;
private ClientSession session;
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@ -69,10 +64,10 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
Assert.assertEquals(bridgeConfig.getQueueName(), proxy.retrieveAttributeValue("queueName"));
Assert.assertEquals(bridgeConfig.getForwardingAddress(), proxy.retrieveAttributeValue("forwardingAddress"));
Assert.assertEquals(bridgeConfig.getFilterString(), proxy.retrieveAttributeValue("filterString"));
Assert.assertEquals(bridgeConfig.getRetryInterval(), ((Long) proxy.retrieveAttributeValue("retryInterval")).longValue());
Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), proxy.retrieveAttributeValue("retryIntervalMultiplier"));
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), ((Integer) proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), ((Boolean) proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
Assert.assertEquals(bridgeConfig.getRetryInterval(), proxy.retrieveAttributeValue("retryInterval", Long.class));
Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), proxy.retrieveAttributeValue("retryIntervalMultiplier", Double.class));
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class));
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class));
Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors");
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]);
@ -125,14 +120,10 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
server_0 = addServer(ActiveMQServers.newActiveMQServer(conf_0, mbeanServer, false));
server_0.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
session.start();
}
protected CoreMessagingProxy createProxy(final String name) throws Exception {
CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_BRIDGE + name);
CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_BRIDGE + name);
return proxy;
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -26,17 +23,12 @@ public class BroadcastGroupControlUsingCoreTest extends BroadcastGroupControlTes
@Override
protected BroadcastGroupControl createManagementControl(final String name) throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
final ClientSession session = addClientSession(sf.createSession(false, true, true));
session.start();
return new BroadcastGroupControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_BROADCAST_GROUP + name);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_BROADCAST_GROUP + name);
@Override
public long getBroadcastPeriod() {
return ((Integer) proxy.retrieveAttributeValue("broadcastPeriod")).longValue();
return (Long) proxy.retrieveAttributeValue("broadcastPeriod", Long.class);
}
@Override
@ -56,12 +48,12 @@ public class BroadcastGroupControlUsingCoreTest extends BroadcastGroupControlTes
@Override
public int getGroupPort() {
return (Integer) proxy.retrieveAttributeValue("groupPort");
return (Integer) proxy.retrieveAttributeValue("groupPort", Integer.class);
}
@Override
public int getLocalBindPort() {
return (Integer) proxy.retrieveAttributeValue("localBindPort");
return (Integer) proxy.retrieveAttributeValue("localBindPort", Integer.class);
}
@Override

View File

@ -16,14 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import java.util.Map;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.Before;
import java.util.Map;
public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionControlTest {
@ -31,9 +27,6 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
// Attributes ----------------------------------------------------
private ClientSession session;
private ServerLocator locator;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -42,12 +35,8 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
@Override
protected ClusterConnectionControl createManagementControl(final String name) throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
return new ClusterConnectionControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_CLUSTER_CONNECTION + name);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_CLUSTER_CONNECTION + name);
@Override
public Object[] getStaticConnectors() {
@ -71,12 +60,12 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
@Override
public int getMaxHops() {
return (Integer) proxy.retrieveAttributeValue("maxHops");
return (Integer) proxy.retrieveAttributeValue("maxHops", Integer.class);
}
@Override
public long getRetryInterval() {
return (Long) proxy.retrieveAttributeValue("retryInterval");
return (Long) proxy.retrieveAttributeValue("retryInterval", Long.class);
}
@Override
@ -91,7 +80,7 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
@Override
public boolean isDuplicateDetection() {
return (Boolean) proxy.retrieveAttributeValue("duplicateDetection");
return (Boolean) proxy.retrieveAttributeValue("duplicateDetection", Boolean.class);
}
@Override
@ -111,7 +100,7 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
@Override
public boolean isStarted() {
return (Boolean) proxy.retrieveAttributeValue("started");
return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);
}
@Override
@ -133,14 +122,6 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
// Protected -----------------------------------------------------
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------

View File

@ -17,9 +17,12 @@
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
public class CoreMessagingProxy {
@ -30,21 +33,16 @@ public class CoreMessagingProxy {
private final String resourceName;
private final ClientSession session;
private final ClientRequestor requestor;
private final ServerLocator locator;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public CoreMessagingProxy(final ClientSession session, final String resourceName) throws Exception {
this.session = session;
public CoreMessagingProxy(final ServerLocator locator, final String resourceName) throws Exception {
this.locator = locator;
this.resourceName = resourceName;
requestor = new ClientRequestor(session, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
}
// Public --------------------------------------------------------
@ -58,10 +56,12 @@ public class CoreMessagingProxy {
}
public Object retrieveAttributeValue(final String attributeName, final Class desiredType) {
ClientMessage m = session.createMessage(false);
ManagementHelper.putAttribute(m, resourceName, attributeName);
ClientMessage reply;
try {
try (ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = getSession(sessionFactory);
ClientRequestor requestor = getClientRequestor(session)) {
ClientMessage m = session.createMessage(false);
ManagementHelper.putAttribute(m, resourceName, attributeName);
ClientMessage reply;
reply = requestor.request(m);
Object result = ManagementHelper.getResult(reply, desiredType);
@ -77,24 +77,38 @@ public class CoreMessagingProxy {
}
public Object invokeOperation(final Class desiredType, final String operationName, final Object... args) throws Exception {
ClientMessage m = session.createMessage(false);
ManagementHelper.putOperationInvocation(m, resourceName, operationName, args);
ClientMessage reply = requestor.request(m);
if (reply != null) {
if (ManagementHelper.hasOperationSucceeded(reply)) {
return ManagementHelper.getResult(reply, desiredType);
try (ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = getSession(sessionFactory);
ClientRequestor requestor = getClientRequestor(session)) {
ClientMessage m = session.createMessage(false);
ManagementHelper.putOperationInvocation(m, resourceName, operationName, args);
ClientMessage reply = requestor.request(m);
if (reply != null) {
if (ManagementHelper.hasOperationSucceeded(reply)) {
return ManagementHelper.getResult(reply, desiredType);
}
else {
throw new Exception((String) ManagementHelper.getResult(reply));
}
}
else {
throw new Exception((String) ManagementHelper.getResult(reply));
return null;
}
}
else {
return null;
}
}
// Private -------------------------------------------------------
private ClientSession getSession(ClientSessionFactory sessionFactory) throws ActiveMQException {
ClientSession session = sessionFactory.createSession(false, true, true);
session.start();
return session;
}
private ClientRequestor getClientRequestor(ClientSession session) throws Exception {
return new ClientRequestor(session, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
}
// Inner classes -------------------------------------------------
}

View File

@ -16,12 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.Before;
public class DivertControlUsingCoreTest extends DivertControlTest {
@ -29,9 +25,6 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
// Attributes ----------------------------------------------------
private ClientSession session;
private ServerLocator locator;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -40,12 +33,8 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
@Override
protected DivertControl createManagementControl(final String name) throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
return new DivertControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_DIVERT + name);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_DIVERT + name);
@Override
public String getAddress() {
@ -91,14 +80,6 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
// Protected -----------------------------------------------------
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------

View File

@ -16,29 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.management.openmbean.CompositeData;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.Before;
import javax.management.openmbean.CompositeData;
public class QueueControlUsingCoreTest extends QueueControlTest {
protected ClientSession session;
private ServerLocator locator;
@Override
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
return new QueueControl() {
private final CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_QUEUE + queue);
private final CoreMessagingProxy proxy = new CoreMessagingProxy(addServerLocator(createInVMNonHALocator()), ResourceNames.CORE_QUEUE + queue);
@Override
public void flushExecutor() {
@ -395,15 +387,4 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
};
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.start();
}
}