ARTEMIS-1107 test AddressControl.sendMessage

Add tests for this management operation with both core and AMQP encoded
messages. Also fix a few problems with the implementation like not
checking the passed-in headers for null and not counting messages
properly.
This commit is contained in:
Justin Bertram 2017-04-10 12:57:02 -05:00 committed by Andy Taylor
parent 359592cf5e
commit 67a06588f4
4 changed files with 94 additions and 5 deletions

View File

@ -291,8 +291,10 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
message.setType((byte) type);
message.setDurable(durable);
@ -341,7 +343,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queue);
// Ignore the "special" subscription
if (coreQueueControl != null && !coreQueueControl.getName().equals(getAddress())) {
if (coreQueueControl != null) {
if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE && coreQueueControl.isDurable() ||
durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()) {
matchingQueues.add(coreQueueControl);

View File

@ -39,6 +39,8 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@ -57,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -74,8 +77,11 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionMa
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
@ -109,12 +115,14 @@ public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String brokerName = "my-broker";
private static final String brokerName = "localhost";
private static final long maxSizeBytes = 1 * 1024 * 1024;
private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
private int messagesSent = 0;
// this will ensure that all tests in this class are run twice,
@ -150,6 +158,8 @@ public class ProtonTest extends ProtonTestBase {
protected ActiveMQServer createAMQPServer(int port) throws Exception {
ActiveMQServer server = super.createAMQPServer(port);
server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
server.setMBeanServer(mBeanServer);
server.getConfiguration().setJMXManagementEnabled(true);
return server;
}
@ -245,6 +255,36 @@ public class ProtonTest extends ProtonTestBase {
}
}
@Test
public void testAddressControlSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
Assert.assertEquals(1, addressControl.getQueueNames().length);
addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
Assert.assertEquals(1, addressControl.getMessageCount());
Connection connection = createConnection("myClientId");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(address.toString());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(500);
assertNotNull(message);
byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
((BytesMessage)message).readBytes(buffer);
assertEquals("test", new String(buffer));
session.close();
connection.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testDurableSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection("myClientId");

View File

@ -30,7 +30,7 @@ import org.junit.Before;
public class ProtonTestBase extends ActiveMQTestBase {
protected String brokerName = "my-broker";
protected String brokerName = "localhost";
protected ActiveMQServer server;
protected String tcpAmqpConnectionUri = "tcp://localhost:5672";

View File

@ -23,9 +23,12 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -33,12 +36,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.RoleInfo;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
@ -310,6 +315,48 @@ public class AddressControlTest extends ManagementTestBase {
assertEquals(RoutingType.ANYCAST.toString(), ((JsonString) jsonArray.get(0)).getString());
}
@Test
public void testGetMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
assertEquals(0, addressControl.getMessageCount());
ClientProducer producer = session.createProducer(address.toString());
producer.send(session.createMessage(false));
assertEquals(0, addressControl.getMessageCount());
session.createQueue(address, RoutingType.ANYCAST, address);
producer.send(session.createMessage(false));
assertEquals(1, addressControl.getMessageCount());
session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
producer.send(session.createMessage(false));
assertEquals(2, addressControl.getMessageCount());
}
@Test
public void testSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
Assert.assertEquals(0, addressControl.getQueueNames().length);
session.createQueue(address, RoutingType.ANYCAST, address);
Assert.assertEquals(1, addressControl.getQueueNames().length);
addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
Assert.assertEquals(1, addressControl.getMessageCount());
ClientConsumer consumer = session.createConsumer(address);
ClientMessage message = consumer.receive(500);
assertNotNull(message);
byte[] buffer = new byte[message.getBodyBuffer().readableBytes()];
message.getBodyBuffer().readBytes(buffer);
assertEquals("test", new String(buffer));
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------