ARTEMIS-346 - Add Management send text message functionality similar to ActiveMQ

https://issues.apache.org/jira/browse/ARTEMIS-346
This commit is contained in:
Andy Taylor 2016-01-15 13:42:47 +00:00 committed by Clebert Suconic
parent 0a9a6c92f4
commit c1de710eb3
13 changed files with 310 additions and 4 deletions

View File

@ -306,6 +306,27 @@ public interface QueueControl {
@Operation(desc = "Send the messages corresponding to the given filter to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION)
int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filterStr) throws Exception;
/**
*
* @param headers the message headers and properties to set. Can only
* container Strings maped to primitive types.
* @param body the text to send
* @param userID
* @param durable
*@param user
* @param password @return
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers,
@Parameter(name = "headers", desc = "A type for the message") final int type,
@Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body,
@Parameter(name = "body", desc = "The user ID to set on the message") String userID,
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
@Parameter(name = "user", desc = "The user to authenticate with") String user,
@Parameter(name = "password", desc = "The users password to authenticate with") String password) throws Exception;
/**
* Changes the message's priority corresponding to the specified message ID to the specified priority.
*

View File

@ -165,6 +165,69 @@ public interface JMSQueueControl extends DestinationControl {
@Operation(desc = "Send the messages corresponding to the given filter to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION)
int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filterStr) throws Exception;
/**
* Sends a TextMesage to the destination.
*
* @param body the text to send
* @return the message id of the message sent.
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendTextMessage(@Parameter(name = "body") String body) throws Exception;
/**
* Sends a TextMessage to the destination.
*
* @param properties the message properties to set as a comma sep name=value list. Can only
* contain Strings maped to primitive types or JMS properties. eg: body=hi,JMSReplyTo=Queue2
* @return the message id of the message sent.
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendTextMessageWithProperties(String properties) throws Exception;
/**
* Sends a TextMesage to the destination.
*
* @param headers the message headers and properties to set. Can only
* container Strings maped to primitive types.
* @param body the text to send
* @return the message id of the message sent.
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendTextMessage(@Parameter(name = "headers") Map<String,String> headers,
@Parameter(name = "body") String body) throws Exception;
/**
* Sends a TextMesage to the destination.
* @param body the text to send
* @param user
* @param password
* @return
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendTextMessage(@Parameter(name = "body") String body,
@Parameter(name = "user") String user,
@Parameter(name = "password") String password) throws Exception;
/**
*
* @param headers the message headers and properties to set. Can only
* container Strings maped to primitive types.
* @param body the text to send
* @param user
* @param password
* @return
* @throws Exception
*/
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
String sendTextMessage(@Parameter(name = "headers") Map<String,String> headers,
@Parameter(name = "body") String body,
@Parameter(name = "user") String user,
@Parameter(name = "password") String password) throws Exception;
/**
* Changes the message's priority corresponding to the specified message ID to the specified priority.
*

View File

@ -22,12 +22,17 @@ import javax.management.StandardMBean;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.Operation;
import org.apache.activemq.artemis.api.core.management.QueueControl;
@ -40,6 +45,8 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONObject;
@ -294,6 +301,52 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
return coreQueueControl.sendMessagesToDeadLetterAddress(filter);
}
@Override
public String sendTextMessageWithProperties(String properties) throws Exception {
String[] kvs = properties.split(",");
Map<String, String> props = new HashMap<String, String>();
for (String kv : kvs) {
String[] it = kv.split("=");
if (it.length == 2) {
props.put(it[0],it[1]);
}
}
return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
}
@Override
public String sendTextMessage(String body) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body);
}
@Override
public String sendTextMessage(Map headers, String body) throws Exception {
return sendTextMessage(headers, body, null, null);
}
@Override
public String sendTextMessage(String body, String user, String password) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
}
@Override
public String sendTextMessage(Map<String, String> headers, String body, String user, String password) throws Exception {
boolean durable = false;
if (headers.containsKey("JMSDeliveryMode")) {
String jmsDeliveryMode = headers.remove("JMSDeliveryMode");
if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT"))) {
durable = true;
}
}
String userID = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56);
buffer.writeNullableSimpleString(new SimpleString(body));
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes), userID, durable, user, password);
return userID;
}
@Override
public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception {
String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);

View File

@ -38,15 +38,22 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONException;
import org.apache.activemq.artemis.utils.json.JSONObject;
@ -64,6 +71,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
private final PostOffice postOffice;
private final StorageManager storageManager;
private final SecurityStore securityStore;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private MessageCounter counter;
@ -106,11 +115,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String address,
final PostOffice postOffice,
final StorageManager storageManager,
final SecurityStore securityStore,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
super(QueueControl.class, storageManager);
this.queue = queue;
this.address = address;
this.postOffice = postOffice;
this.storageManager = storageManager;
this.securityStore = securityStore;
this.addressSettingsRepository = addressSettingsRepository;
}
@ -703,6 +715,45 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public String sendMessage(final Map<String, String> headers,
final int type,
final String body,
final String userID,
boolean durable, final String user,
final String password) throws Exception {
securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
@Override
public String getPassword() {
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return null;
}
});
ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
message.setUserID(new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(userID)));
if (body != null) {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
message.setAddress(queue.getAddress());
postOffice.route(message, null, true);
return "" + message.getMessageID();
}
@Override
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
checkStarted();

View File

@ -1637,7 +1637,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool, protocolManagerFactories, executorFactory.getExecutor(), serviceRegistry);
messagingServerControl = managementService.registerServer(postOffice, storageManager, configuration, addressSettingsRepository, securityRepository, resourceManager, remotingService, this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup());
messagingServerControl = managementService.registerServer(postOffice, securityStore, storageManager, configuration, addressSettingsRepository, securityRepository, resourceManager, remotingService, this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -66,6 +67,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void setStorageManager(StorageManager storageManager);
ActiveMQServerControlImpl registerServer(final PostOffice postOffice,
final SecurityStore securityStore,
final StorageManager storageManager,
final Configuration configuration,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,

View File

@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@ -100,6 +101,8 @@ public class ManagementServiceImpl implements ManagementService {
private PostOffice postOffice;
private SecurityStore securityStore;
private PagingManager pagingManager;
private StorageManager storageManager;
@ -166,6 +169,7 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public ActiveMQServerControlImpl registerServer(final PostOffice postOffice,
final SecurityStore securityStore,
final StorageManager storageManager1,
final Configuration configuration,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
@ -178,6 +182,7 @@ public class ManagementServiceImpl implements ManagementService {
final PagingManager pagingManager,
final boolean backup) throws Exception {
this.postOffice = postOffice;
this.securityStore = securityStore;
this.addressSettingsRepository = addressSettingsRepository;
this.securityRepository = securityRepository;
this.storageManager = storageManager1;
@ -229,7 +234,7 @@ public class ManagementServiceImpl implements ManagementService {
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager) throws Exception {
QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, addressSettingsRepository);
QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);

View File

@ -90,7 +90,7 @@ public class ActiveMQJAASSecurityManager implements ActiveMQSecurityManager2 {
final String address,
final RemotingConnection connection) {
X509Certificate[] certificates = null;
if (connection.getTransportConnection() instanceof NettyConnection) {
if (connection != null && connection.getTransportConnection() instanceof NettyConnection) {
certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
}
Subject localSubject;

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
@ -207,6 +208,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
@Override
public ActiveMQServerControlImpl registerServer(PostOffice postOffice,
SecurityStore securityStore,
StorageManager storageManager,
Configuration configuration,
HierarchicalRepository<AddressSettings> addressSettingsRepository,

View File

@ -29,6 +29,7 @@ import javax.management.openmbean.CompositeData;
import javax.naming.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -155,6 +156,24 @@ public class JMSQueueControlTest extends ManagementTestBase {
Assert.assertEquals(0, data.length);
}
@Test
public void testSendTextMessage() throws Exception {
JMSQueueControl queueControl = createManagementControl();
Assert.assertEquals(0, getMessageCount(queueControl));
String id = queueControl.sendTextMessage(new HashMap<String, String>(), "theBody", "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
CompositeData[] data = queueControl.browse();
Assert.assertEquals(1, data.length);
Assert.assertEquals("ID:" + id, data[0].get("JMSMessageID"));
Assert.assertEquals("theBody", data[0].get("Text"));
System.out.println(data[0]);
}
@Test
public void testBrowseMessagesWithNullFilter() throws Exception {
JMSQueueControl queueControl = createManagementControl();

View File

@ -290,6 +290,31 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
}
@Override
public String sendTextMessage(@Parameter(name = "body") String body) throws Exception {
return null;
}
@Override
public String sendTextMessageWithProperties(String properties) throws Exception {
return null;
}
@Override
public String sendTextMessage(Map<String, String> headers, String body) throws Exception {
return null;
}
@Override
public String sendTextMessage(String body, String user, String password) throws Exception {
return null;
}
@Override
public String sendTextMessage(Map<String, String> headers, String body, String user, String password) throws Exception {
return (String) proxy.invokeOperation("sendTextMessage", headers, body, user, password);
}
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception {
proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress);
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.management;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -39,12 +41,14 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONObject;
@ -2017,6 +2021,57 @@ public class QueueControlTest extends ManagementTestBase {
assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType());
}
@Test
public void testSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
queueControl.sendMessage(new HashMap<String, String>(), MessageImpl.TEXT_TYPE, Base64.encodeBytes("theBody".getBytes()), "myID", true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(1, browse.length);
byte[] body = (byte[]) browse[0].get("body");
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testSendNullMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
queueControl.sendMessage(new HashMap<String, String>(), MessageImpl.TEXT_TYPE, null, "myID", true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(1, browse.length);
byte[] body = (byte[]) browse[0].get("body");
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "");
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -307,6 +307,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
}
@Override
public String sendMessage(Map<String, String> headers, int type, String body, String userID, boolean durable, String user, String password) throws Exception {
return (String) proxy.invokeOperation("sendMessage", headers, type, body, userID, durable, user, password);
}
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception {
proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress);
}
@ -332,7 +337,12 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
@Override
public CompositeData[] browse(String filter) throws Exception {
return null;
Map map = (Map) proxy.invokeOperation("browse", filter);
CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName());
if (compositeDatas == null) {
compositeDatas = new CompositeData[0];
}
return compositeDatas;
}
@Override