From 67a06588f41789f50b6eeea1a77da71abdbe4141 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 10 Apr 2017 12:57:02 -0500 Subject: [PATCH] ARTEMIS-1107 test AddressControl.sendMessage Add tests for this management operation with both core and AMQP encoded messages. Also fix a few problems with the implementation like not checking the passed-in headers for null and not counting messages properly. --- .../management/impl/AddressControlImpl.java | 8 ++-- .../tests/integration/amqp/ProtonTest.java | 42 ++++++++++++++++- .../integration/amqp/ProtonTestBase.java | 2 +- .../management/AddressControlTest.java | 47 +++++++++++++++++++ 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index afdca459ac..a3211659a7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -291,8 +291,10 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } }); CoreMessage message = new CoreMessage(storageManager.generateID(), 50); - for (String header : headers.keySet()) { - message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); + if (headers != null) { + for (String header : headers.keySet()) { + message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); + } } message.setType((byte) type); message.setDurable(durable); @@ -341,7 +343,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue); // Ignore the "special" subscription - if (coreQueueControl != null && !coreQueueControl.getName().equals(getAddress())) { + if (coreQueueControl != null) { if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() || durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) { matchingQueues.add(coreQueueControl); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 199d9c56c8..f4433383f5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -39,6 +39,8 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -57,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -74,8 +77,11 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionMa import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.VersionLoader; @@ -109,12 +115,14 @@ public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - private static final String brokerName = "my-broker"; + private static final String brokerName = "localhost"; private static final long maxSizeBytes = 1 * 1024 * 1024; private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024; + private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); + private int messagesSent = 0; // this will ensure that all tests in this class are run twice, @@ -150,6 +158,8 @@ public class ProtonTest extends ProtonTestBase { protected ActiveMQServer createAMQPServer(int port) throws Exception { ActiveMQServer server = super.createAMQPServer(port); server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1"); + server.setMBeanServer(mBeanServer); + server.getConfiguration().setJMXManagementEnabled(true); return server; } @@ -245,6 +255,36 @@ public class ProtonTest extends ProtonTestBase { } } + @Test + public void testAddressControlSendMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer); + Assert.assertEquals(1, addressControl.getQueueNames().length); + addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null); + + Assert.assertEquals(1, addressControl.getMessageCount()); + + Connection connection = createConnection("myClientId"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(address.toString()); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(500); + assertNotNull(message); + byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(buffer); + assertEquals("test", new String(buffer)); + session.close(); + connection.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + @Test public void testDurableSubscriptionUnsubscribe() throws Exception { Connection connection = createConnection("myClientId"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java index 1a06c5449d..599022ed1e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java @@ -30,7 +30,7 @@ import org.junit.Before; public class ProtonTestBase extends ActiveMQTestBase { - protected String brokerName = "my-broker"; + protected String brokerName = "localhost"; protected ActiveMQServer server; protected String tcpAmqpConnectionUri = "tcp://localhost:5672"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index e62ce12063..85f6ecafe1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -23,9 +23,12 @@ import java.util.HashSet; import java.util.Set; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -33,12 +36,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.RoleInfo; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; @@ -310,6 +315,48 @@ public class AddressControlTest extends ManagementTestBase { assertEquals(RoutingType.ANYCAST.toString(), ((JsonString) jsonArray.get(0)).getString()); } + @Test + public void testGetMessageCount() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + assertEquals(0, addressControl.getMessageCount()); + + ClientProducer producer = session.createProducer(address.toString()); + producer.send(session.createMessage(false)); + assertEquals(0, addressControl.getMessageCount()); + + session.createQueue(address, RoutingType.ANYCAST, address); + producer.send(session.createMessage(false)); + assertEquals(1, addressControl.getMessageCount()); + + session.createQueue(address, RoutingType.ANYCAST, address.concat('2')); + producer.send(session.createMessage(false)); + assertEquals(2, addressControl.getMessageCount()); + } + + @Test + public void testSendMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + Assert.assertEquals(0, addressControl.getQueueNames().length); + session.createQueue(address, RoutingType.ANYCAST, address); + Assert.assertEquals(1, addressControl.getQueueNames().length); + addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null); + + Assert.assertEquals(1, addressControl.getMessageCount()); + + ClientConsumer consumer = session.createConsumer(address); + ClientMessage message = consumer.receive(500); + assertNotNull(message); + byte[] buffer = new byte[message.getBodyBuffer().readableBytes()]; + message.getBodyBuffer().readBytes(buffer); + assertEquals("test", new String(buffer)); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------