diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 298f1e724c..76ad6a548f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -306,6 +306,27 @@ public interface QueueControl { @Operation(desc = "Send the messages corresponding to the given filter to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION) int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filterStr) throws Exception; + /** + * + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @param userID + * @param durable + *@param user + * @param password @return + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map headers, + @Parameter(name = "headers", desc = "A type for the message") final int type, + @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body, + @Parameter(name = "body", desc = "The user ID to set on the message") String userID, + @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, + @Parameter(name = "user", desc = "The user to authenticate with") String user, + @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; + + /** * Changes the message's priority corresponding to the specified message ID to the specified priority. * diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index d1251fa843..941b5d0ec0 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -165,6 +165,69 @@ public interface JMSQueueControl extends DestinationControl { @Operation(desc = "Send the messages corresponding to the given filter to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION) int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filterStr) throws Exception; + /** + * Sends a TextMesage to the destination. + * + * @param body the text to send + * @return the message id of the message sent. + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendTextMessage(@Parameter(name = "body") String body) throws Exception; + + /** + * Sends a TextMessage to the destination. + * + * @param properties the message properties to set as a comma sep name=value list. Can only + * contain Strings maped to primitive types or JMS properties. eg: body=hi,JMSReplyTo=Queue2 + * @return the message id of the message sent. + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendTextMessageWithProperties(String properties) throws Exception; + + /** + * Sends a TextMesage to the destination. + * + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @return the message id of the message sent. + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendTextMessage(@Parameter(name = "headers") Map headers, + @Parameter(name = "body") String body) throws Exception; + + /** + * Sends a TextMesage to the destination. + * @param body the text to send + * @param user + * @param password + * @return + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendTextMessage(@Parameter(name = "body") String body, + @Parameter(name = "user") String user, + @Parameter(name = "password") String password) throws Exception; + + /** + * + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @param user + * @param password + * @return + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendTextMessage(@Parameter(name = "headers") Map headers, + @Parameter(name = "body") String body, + @Parameter(name = "user") String user, + @Parameter(name = "password") String password) throws Exception; + /** * Changes the message's priority corresponding to the specified message ID to the specified priority. * diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index 130f418ec3..adbe4887ab 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -22,12 +22,17 @@ import javax.management.StandardMBean; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; import org.apache.activemq.artemis.api.core.FilterConstants; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.MessageCounterInfo; import org.apache.activemq.artemis.api.core.management.Operation; import org.apache.activemq.artemis.api.core.management.QueueControl; @@ -40,6 +45,8 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.SelectorTranslator; import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport; import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.json.JSONArray; import org.apache.activemq.artemis.utils.json.JSONObject; @@ -294,6 +301,52 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return coreQueueControl.sendMessagesToDeadLetterAddress(filter); } + @Override + public String sendTextMessageWithProperties(String properties) throws Exception { + String[] kvs = properties.split(","); + Map props = new HashMap(); + for (String kv : kvs) { + String[] it = kv.split("="); + if (it.length == 2) { + props.put(it[0],it[1]); + } + } + return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password")); + } + + @Override + public String sendTextMessage(String body) throws Exception { + return sendTextMessage(Collections.EMPTY_MAP, body); + } + + @Override + public String sendTextMessage(Map headers, String body) throws Exception { + return sendTextMessage(headers, body, null, null); + } + + @Override + public String sendTextMessage(String body, String user, String password) throws Exception { + return sendTextMessage(Collections.EMPTY_MAP, body, user, password); + } + + @Override + public String sendTextMessage(Map headers, String body, String user, String password) throws Exception { + boolean durable = false; + if (headers.containsKey("JMSDeliveryMode")) { + String jmsDeliveryMode = headers.remove("JMSDeliveryMode"); + if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT"))) { + durable = true; + } + } + String userID = UUIDGenerator.getInstance().generateStringUUID(); + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56); + buffer.writeNullableSimpleString(new SimpleString(body)); + byte[] bytes = new byte[buffer.readableBytes()]; + buffer.readBytes(bytes); + coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes), userID, durable, user, password); + return userID; + } + @Override public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception { String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index a606a0de29..b9f096418f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -38,15 +38,22 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.LinkedListIterator; +import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.json.JSONArray; import org.apache.activemq.artemis.utils.json.JSONException; import org.apache.activemq.artemis.utils.json.JSONObject; @@ -64,6 +71,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { private final PostOffice postOffice; + private final StorageManager storageManager; + private final SecurityStore securityStore; private final HierarchicalRepository addressSettingsRepository; private MessageCounter counter; @@ -106,11 +115,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { final String address, final PostOffice postOffice, final StorageManager storageManager, + final SecurityStore securityStore, final HierarchicalRepository addressSettingsRepository) throws Exception { super(QueueControl.class, storageManager); this.queue = queue; this.address = address; this.postOffice = postOffice; + this.storageManager = storageManager; + this.securityStore = securityStore; this.addressSettingsRepository = addressSettingsRepository; } @@ -703,6 +715,45 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public String sendMessage(final Map headers, + final int type, + final String body, + final String userID, + boolean durable, final String user, + final String password) throws Exception { + securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() { + @Override + public String getUsername() { + return user; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public RemotingConnection getRemotingConnection() { + return null; + } + }); + ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50); + for (String header : headers.keySet()) { + message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header))); + } + message.setType((byte) type); + message.setDurable(durable); + message.setTimestamp(System.currentTimeMillis()); + message.setUserID(new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(userID))); + if (body != null) { + message.getBodyBuffer().writeBytes(Base64.decode(body)); + } + message.setAddress(queue.getAddress()); + postOffice.route(message, null, true); + return "" + message.getMessageID(); + } + @Override public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index abdf428154..4963ed8489 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1637,7 +1637,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool, protocolManagerFactories, executorFactory.getExecutor(), serviceRegistry); - messagingServerControl = managementService.registerServer(postOffice, storageManager, configuration, addressSettingsRepository, securityRepository, resourceManager, remotingService, this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup()); + messagingServerControl = managementService.registerServer(postOffice, securityStore, storageManager, configuration, addressSettingsRepository, securityRepository, resourceManager, remotingService, this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup()); // Address settings need to deployed initially, since they're require on paging manager.start() diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java index 3e77d152b7..c98c22e81d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -66,6 +67,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen void setStorageManager(StorageManager storageManager); ActiveMQServerControlImpl registerServer(final PostOffice postOffice, + final SecurityStore securityStore, final StorageManager storageManager, final Configuration configuration, final HierarchicalRepository addressSettingsRepository, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index e780ce3f95..c4d0cd65f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -100,6 +101,8 @@ public class ManagementServiceImpl implements ManagementService { private PostOffice postOffice; + private SecurityStore securityStore; + private PagingManager pagingManager; private StorageManager storageManager; @@ -166,6 +169,7 @@ public class ManagementServiceImpl implements ManagementService { @Override public ActiveMQServerControlImpl registerServer(final PostOffice postOffice, + final SecurityStore securityStore, final StorageManager storageManager1, final Configuration configuration, final HierarchicalRepository addressSettingsRepository, @@ -178,6 +182,7 @@ public class ManagementServiceImpl implements ManagementService { final PagingManager pagingManager, final boolean backup) throws Exception { this.postOffice = postOffice; + this.securityStore = securityStore; this.addressSettingsRepository = addressSettingsRepository; this.securityRepository = securityRepository; this.storageManager = storageManager1; @@ -229,7 +234,7 @@ public class ManagementServiceImpl implements ManagementService { public synchronized void registerQueue(final Queue queue, final SimpleString address, final StorageManager storageManager) throws Exception { - QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, addressSettingsRepository); + QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, securityStore, addressSettingsRepository); if (messageCounterManager != null) { MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount()); queueControl.setMessageCounter(counter); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java index eebc6aba23..c814600aef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java @@ -90,7 +90,7 @@ public class ActiveMQJAASSecurityManager implements ActiveMQSecurityManager2 { final String address, final RemotingConnection connection) { X509Certificate[] certificates = null; - if (connection.getTransportConnection() instanceof NettyConnection) { + if (connection != null && connection.getTransportConnection() instanceof NettyConnection) { certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel()); } Subject localSubject; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java index 3ccb311eef..aea4ae256f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Queue; @@ -207,6 +208,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase { @Override public ActiveMQServerControlImpl registerServer(PostOffice postOffice, + SecurityStore securityStore, StorageManager storageManager, Configuration configuration, HierarchicalRepository addressSettingsRepository, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java index d86e0bd7aa..7636248896 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java @@ -29,6 +29,7 @@ import javax.management.openmbean.CompositeData; import javax.naming.Context; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -155,6 +156,24 @@ public class JMSQueueControlTest extends ManagementTestBase { Assert.assertEquals(0, data.length); } + @Test + public void testSendTextMessage() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + + String id = queueControl.sendTextMessage(new HashMap(), "theBody", "myUser", "myPassword"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertEquals("ID:" + id, data[0].get("JMSMessageID")); + Assert.assertEquals("theBody", data[0].get("Text")); + System.out.println(data[0]); + + } + @Test public void testBrowseMessagesWithNullFilter() throws Exception { JMSQueueControl queueControl = createManagementControl(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index a4aec4a092..2a966cf3b3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -290,6 +290,31 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr); } + @Override + public String sendTextMessage(@Parameter(name = "body") String body) throws Exception { + return null; + } + + @Override + public String sendTextMessageWithProperties(String properties) throws Exception { + return null; + } + + @Override + public String sendTextMessage(Map headers, String body) throws Exception { + return null; + } + + @Override + public String sendTextMessage(String body, String user, String password) throws Exception { + return null; + } + + @Override + public String sendTextMessage(Map headers, String body, String user, String password) throws Exception { + return (String) proxy.invokeOperation("sendTextMessage", headers, body, user, password); + } + public void setDeadLetterAddress(final String deadLetterAddress) throws Exception { proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 52c5edb2b4..12c63ddfe1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.tests.integration.management; import javax.management.Notification; +import javax.management.openmbean.CompositeData; +import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -39,12 +41,14 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; +import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.json.JSONArray; import org.apache.activemq.artemis.utils.json.JSONObject; @@ -2017,6 +2021,57 @@ public class QueueControlTest extends ManagementTestBase { assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType()); } + @Test + public void testSendMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + + queueControl.sendMessage(new HashMap(), MessageImpl.TEXT_TYPE, Base64.encodeBytes("theBody".getBytes()), "myID", true, "myUser", "myPassword"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(1, browse.length); + + byte[] body = (byte[]) browse[0].get("body"); + + Assert.assertNotNull(body); + + Assert.assertEquals(new String(body), "theBody"); + } + + @Test + public void testSendNullMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + + queueControl.sendMessage(new HashMap(), MessageImpl.TEXT_TYPE, null, "myID", true, "myUser", "myPassword"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(1, browse.length); + + byte[] body = (byte[]) browse[0].get("body"); + + Assert.assertNotNull(body); + + Assert.assertEquals(new String(body), ""); + } + + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 15532559de..68dfd48a9d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -307,6 +307,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr); } + @Override + public String sendMessage(Map headers, int type, String body, String userID, boolean durable, String user, String password) throws Exception { + return (String) proxy.invokeOperation("sendMessage", headers, type, body, userID, durable, user, password); + } + public void setDeadLetterAddress(final String deadLetterAddress) throws Exception { proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress); } @@ -332,7 +337,12 @@ public class QueueControlUsingCoreTest extends QueueControlTest { @Override public CompositeData[] browse(String filter) throws Exception { - return null; + Map map = (Map) proxy.invokeOperation("browse", filter); + CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName()); + if (compositeDatas == null) { + compositeDatas = new CompositeData[0]; + } + return compositeDatas; } @Override