diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index f730eb4faf..f80a628bdd 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -84,6 +84,7 @@ public class ConsumerThread extends Thread { } } else { if (verbose) { + System.out.println("JMS Message ID:" + msg.getJMSMessageID()); if (bytesAsText && (msg instanceof BytesMessage)) { long length = ((BytesMessage) msg).getBodyLength(); byte[] bytes = new byte[(int) length]; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 361134c821..539988b688 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -150,6 +150,26 @@ public interface AddressControl { @Parameter(name = "user", desc = "The user to authenticate with") String user, @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; + /** + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @param durable + * @param user + * @param password @return + * @param createMessageId whether or not to auto generate a Message ID + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected address.", impact = MBeanOperationInfo.ACTION) + String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map headers, + @Parameter(name = "type", desc = "A type for the message") int type, + @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body, + @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, + @Parameter(name = "user", desc = "The user to authenticate with") String user, + @Parameter(name = "password", desc = "The users password to authenticate with") String password, + @Parameter(name = "createMessageId", desc = "whether or not to auto generate a Message ID") boolean createMessageId) throws Exception; + + /** * Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues. * Newly added queue will be paused too until resume is called. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index f5fc797e77..cbca467d3b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -565,6 +565,25 @@ public interface QueueControl { @Parameter(name = "user", desc = "The user to authenticate with") String user, @Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception; + /** + * @param headers the message headers and properties to set. Can only + * container Strings maped to primitive types. + * @param body the text to send + * @param durable + * @param user + * @param password @return + * @param createMessageId whether or not to auto generate a Message ID + * @throws Exception + */ + @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION) + String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map headers, + @Parameter(name = "type", desc = "A type for the message") int type, + @Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body, + @Parameter(name = "durable", desc = "Whether the message is durable") boolean durable, + @Parameter(name = "user", desc = "The user to authenticate with") String user, + @Parameter(name = "password", desc = "The users password to authenticate with") String password, + @Parameter(name = "createMessageId", desc = "whether or not to auto generate a Message ID") boolean createMessageId) throws Exception; + /** * Changes the message's priority corresponding to the specified message ID to the specified priority. * diff --git a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/addressSendMessage.js b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/addressSendMessage.js index 04b5ac0b4e..972035b3a9 100644 --- a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/addressSendMessage.js +++ b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/addressSendMessage.js @@ -42,6 +42,20 @@ var Artemis;
+ +
+
+ + +
@@ -87,7 +101,7 @@ var Artemis;

- +

+ + + `, controller: AddressSendMessageController }) @@ -110,7 +140,11 @@ var Artemis; 'durable': { 'value': true, 'converter': Core.parseBooleanValue - } + }, + 'messageID': { + 'value': true, + 'converter': Core.parseBooleanValue + } }); var ctrl = this; ctrl.messageCreator = messageCreator; diff --git a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/sendMessage.js b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/sendMessage.js index 92ad715e28..a412dfb3ce 100644 --- a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/sendMessage.js +++ b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/sendMessage.js @@ -42,6 +42,20 @@ var Artemis;
+ +
+
+ + +
@@ -87,7 +101,7 @@ var Artemis;

- +

+ + + `, controller: SendMessageController }) @@ -110,6 +140,10 @@ var Artemis; 'durable': { 'value': true, 'converter': Core.parseBooleanValue + }, + 'messageID': { + 'value': true, + 'converter': Core.parseBooleanValue } }); var ctrl = this; diff --git a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/services/sendMessageService.js b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/services/sendMessageService.js index bc4eea6c7b..f2fa297133 100644 --- a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/services/sendMessageService.js +++ b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/services/sendMessageService.js @@ -30,6 +30,7 @@ var Artemis; function message(scope, location, route, localStorage, artemisMessage, workspace, element, timeout, jolokia) { this.noCredentials = false, this.durable = true, + this.messageID = false; this.message = "", this.headers = [], this.scope = scope; @@ -118,12 +119,12 @@ var Artemis; this.formatMessage = function () { CodeEditor.autoFormatEditor(this.scope.codeMirror); }; - this.sendMessage = function (durable) { + this.sendMessage = function (durable, createMessageId) { var body = this.message; Artemis.log.debug(body); - this.doSendMessage(this.durable, body); + this.doSendMessage(this.durable, createMessageId, body); }; - this.doSendMessage = function(durable, body) { + this.doSendMessage = function(durable, createMessageId, body) { var selection = this.workspace.selection; if (selection) { var mbean = selection.objectName; @@ -151,7 +152,7 @@ var Artemis; Artemis.log.debug(type); Artemis.log.debug(body); Artemis.log.debug(durable); - this.jolokia.execute(mbean, "sendMessage(java.util.Map, int, java.lang.String, boolean, java.lang.String, java.lang.String)", headers, type, body, durable, user, pwd, Core.onSuccess(this.operationSuccess(), { error: this.onError })); + this.jolokia.execute(mbean, "sendMessage(java.util.Map, int, java.lang.String, boolean, java.lang.String, java.lang.String, boolean)", headers, type, body, durable, user, pwd, createMessageId, Core.onSuccess(this.operationSuccess(), { error: this.onError })); Core.$apply(this.scope); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java index 8dca5281d9..cfcf2c2e09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RunnableEx; +import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; public abstract class AbstractControl extends StandardMBean { @@ -122,6 +123,7 @@ public abstract class AbstractControl extends StandardMBean { boolean durable, String user, String password, + boolean createMessageId, Long...queueID) throws Exception { ManagementRemotingConnection fakeConnection = new ManagementRemotingConnection(); ServerSession serverSession = server.createSession("management::" + UUIDGenerator.getInstance().generateStringUUID(), user, password, @@ -159,6 +161,11 @@ public abstract class AbstractControl extends StandardMBean { message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array()); } + if (createMessageId) { + UUID userID = UUIDGenerator.getInstance().generateUUID(); + message.setUserID(userID); + } + // There's no point on direct delivery using the management thread, use false here serverSession.send(message, false); return "" + message.getMessageID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 4f65e694f2..e384697de0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -379,11 +379,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro boolean durable, final String user, final String password) throws Exception { + return sendMessage(headers, type, body, durable, user, password, false); + } + + @Override + public String sendMessage(final Map headers, + final int type, + final String body, + boolean durable, + final String user, + final String password, + boolean createMessageId) throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****"); } try { - return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password); + return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId); } catch (Exception e) { e.printStackTrace(); throw new IllegalStateException(e.getMessage()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4ecdeead46..5643a787ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1308,11 +1308,22 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { boolean durable, final String user, final String password) throws Exception { + return sendMessage(headers, type, body, durable, user, password, false); + } + + @Override + public String sendMessage(final Map headers, + final int type, + final String body, + boolean durable, + final String user, + final String password, + boolean createMessageId) throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****"); } try { - String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, queue.getID()); + String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID()); if (AuditLogger.isResourceLoggingEnabled()) { AuditLogger.sendMessageSuccess(queue.getName().toString(), user); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 026cd9f662..2095272141 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -491,6 +491,35 @@ public class AddressControlTest extends ManagementTestBase { assertEquals("myValue2", message.getStringProperty("myProp2")); } + @Test + public void testSendMessageWithMessageId() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, false); + + AddressControl addressControl = createManagementControl(address); + Assert.assertEquals(0, addressControl.getQueueNames().length); + session.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST)); + Assert.assertEquals(1, addressControl.getQueueNames().length); + addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null, true); + addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null, false); + + Wait.waitFor(() -> addressControl.getMessageCount() == 2); + Assert.assertEquals(2, addressControl.getMessageCount()); + + ClientConsumer consumer = session.createConsumer(address); + ClientMessage message = consumer.receive(500); + assertNotNull(message); + assertNotNull(message.getUserID()); + byte[] buffer = new byte[message.getBodyBuffer().readableBytes()]; + message.getBodyBuffer().readBytes(buffer); + assertEquals("test", new String(buffer));message = consumer.receive(500); + assertNotNull(message); + assertNull(message.getUserID()); + buffer = new byte[message.getBodyBuffer().readableBytes()]; + message.getBodyBuffer().readBytes(buffer); + assertEquals("test", new String(buffer)); + } + @Test public void testGetCurrentDuplicateIdCacheSize() throws Exception { internalDuplicateIdTest(false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index 5d2f5f5388..c512a44afa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -187,6 +187,17 @@ public class AddressControlUsingCoreTest extends AddressControlTest { String password) throws Exception { return (String) proxy.invokeOperation("sendMessage", headers, type, body, durable, user, password); } + + @Override + public String sendMessage(Map headers, + int type, + String body, + boolean durable, + String user, + String password, + boolean createMessageId) throws Exception { + return (String) proxy.invokeOperation("sendMessage", headers, type, body, durable, user, password, createMessageId); + } }; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 3665245800..9260971a81 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -3447,6 +3447,46 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(new String(body), "theBody"); } + @Test + public void testSendMessageWithMessageId() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + QueueControl queueControl = createManagementControl(address, queue); + + queueControl.sendMessage(new HashMap(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword"); + queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword", true); + + Wait.assertEquals(2, () -> getMessageCount(queueControl)); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(2, browse.length); + + byte[] body = (byte[]) browse[0].get(BODY); + + String messageID = (String) browse[0].get("userID"); + + Assert.assertEquals(0, messageID.length()); + + Assert.assertNotNull(body); + + Assert.assertEquals(new String(body), "theBody"); + + body = (byte[]) browse[1].get(BODY); + + messageID = (String) browse[1].get("userID"); + + Assert.assertTrue(messageID.length() > 0); + + Assert.assertNotNull(body); + + Assert.assertEquals(new String(body), "theBody"); + } + @Test public void testSendMessageWithProperties() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 6f8fe4c589..be9b65cc08 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -535,6 +535,17 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (String) proxy.invokeOperation("sendMessage", headers, type, body, durable, user, password); } + @Override + public String sendMessage(Map headers, + int type, + String body, + boolean durable, + String user, + String password, + boolean createMessageId) throws Exception { + return (String) proxy.invokeOperation("sendMessage", headers, type, body, durable, user, password, createMessageId); + } + public void setDeadLetterAddress(final String deadLetterAddress) throws Exception { proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress); }