ARTEMIS-604 - Add checks for object messages in REST and AMQP

- Rest interface fix
  - Doc fixes (Rest->REST)
  - JSON management and AMQP outbound
This commit is contained in:
Howard Gao 2016-08-04 09:39:20 +08:00 committed by Martyn Taylor
parent 0535218cfc
commit 2fb8341f8d
41 changed files with 1525 additions and 220 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
import org.apache.activemq.artemis.utils.StringEscapeUtils;
import javax.json.Json;
@ -32,7 +33,6 @@ import javax.json.JsonValue;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
@ -155,7 +155,8 @@ public final class JsonUtil {
CompositeData[] cds = new CompositeData[data.length];
for (int i1 = 0; i1 < data.length; i1++) {
String dataConverted = convertJsonValue(data[i1], String.class).toString();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(Base64.decode(dataConverted)));
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(Base64.decode(dataConverted)));
ois.setWhiteList("java.util,java.lang,javax.management");
cds[i1] = (CompositeDataSupport) ois.readObject();
}
innerVal = cds;

View File

@ -24,18 +24,18 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.utils.IDGenerator;
public class ActiveMQJMSVendor implements JMSVendor {
@ -116,12 +116,9 @@ public class ActiveMQJMSVendor implements JMSVendor {
return new ServerJMSMapMessage(wrapped, deliveryCount);
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
return new ServerJMSTextMessage(wrapped, deliveryCount);
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
return new ServerJMSObjectMessage(wrapped, deliveryCount);
default:
return new ServerJMSMessage(wrapped, deliveryCount);
}
}
@Override

View File

@ -34,6 +34,9 @@ public class MessageServiceConfiguration {
private String inVmId = "0";
private boolean useLinkHeaders = false;
private String deserializationWhiteList;
private String deserializationBlackList;
@XmlElement(name = "server-in-vm-id")
public String getInVmId() {
return inVmId;
@ -132,4 +135,20 @@ public class MessageServiceConfiguration {
public void setConsumerWindowSize(int consumerWindowSize) {
this.consumerWindowSize = consumerWindowSize;
}
public String getDeserializationWhiteList() {
return deserializationWhiteList;
}
public void setDeserializationWhiteList(String deserializationWhiteList) {
this.deserializationWhiteList = deserializationWhiteList;
}
public String getDeserializationBlackList() {
return deserializationBlackList;
}
public void setDeserializationBlackList(String deserializationBlackList) {
this.deserializationBlackList = deserializationBlackList;
}
}

View File

@ -23,6 +23,7 @@ import java.net.URL;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBContext;
@ -32,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.DestinationSettings;
import org.apache.activemq.artemis.rest.queue.QueueServiceManager;
import org.apache.activemq.artemis.rest.topic.TopicServiceManager;
@ -46,8 +48,8 @@ import org.apache.activemq.artemis.utils.XMLUtil;
public class MessageServiceManager {
protected ExecutorService threadPool;
protected QueueServiceManager queueManager = new QueueServiceManager();
protected TopicServiceManager topicManager = new TopicServiceManager();
protected QueueServiceManager queueManager;
protected TopicServiceManager topicManager;
protected TimeoutTask timeoutTask;
protected int timeoutTaskInterval = 1;
protected MessageServiceConfiguration configuration = new MessageServiceConfiguration();
@ -55,6 +57,13 @@ public class MessageServiceManager {
protected String configResourcePath;
protected BindingRegistry registry;
private ClientSessionFactory consumerSessionFactory;
public MessageServiceManager(ConnectionFactoryOptions jmsOptions) {
queueManager = new QueueServiceManager(jmsOptions);
topicManager = new TopicServiceManager(jmsOptions);
}
public BindingRegistry getRegistry() {
return registry;
}
@ -147,7 +156,7 @@ public class MessageServiceManager {
consumerLocator.setConsumerWindowSize(configuration.getConsumerWindowSize());
}
ClientSessionFactory consumerSessionFactory = consumerLocator.createSessionFactory();
consumerSessionFactory = consumerLocator.createSessionFactory();
ActiveMQRestLogger.LOGGER.debug("Created ClientSessionFactory: " + consumerSessionFactory);
ServerLocator defaultLocator = new ServerLocatorImpl(false, new TransportConfiguration(InVMConnectorFactory.class.getName(), transportConfig));
@ -197,5 +206,13 @@ public class MessageServiceManager {
if (topicManager != null)
topicManager.stop();
topicManager = null;
this.timeoutTask.stop();
threadPool.shutdown();
try {
threadPool.awaitTermination(5000, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
}
this.consumerSessionFactory.close();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.rest.integration;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.jboss.resteasy.plugins.server.tjws.TJWSEmbeddedJaxrsServer;
import org.apache.activemq.artemis.rest.MessageServiceManager;
import org.jboss.resteasy.test.TestPortProvider;
@ -25,13 +26,14 @@ public class EmbeddedRestActiveMQ {
protected TJWSEmbeddedJaxrsServer tjws = new TJWSEmbeddedJaxrsServer();
protected EmbeddedActiveMQ embeddedActiveMQ;
protected MessageServiceManager manager = new MessageServiceManager();
protected MessageServiceManager manager = new MessageServiceManager(null);
public EmbeddedRestActiveMQ() {
public EmbeddedRestActiveMQ(ConnectionFactoryOptions jmsOptions) {
int port = TestPortProvider.getPort();
tjws.setPort(port);
tjws.setRootResourcePath("");
tjws.setSecurityDomain(null);
manager = new MessageServiceManager(jmsOptions);
initEmbeddedActiveMQ();
}

View File

@ -16,11 +16,16 @@
*/
package org.apache.activemq.artemis.rest.integration;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
public class EmbeddedRestActiveMQJMS extends EmbeddedRestActiveMQ {
public EmbeddedRestActiveMQJMS(ConnectionFactoryOptions jmsOptions) {
super(jmsOptions);
}
@Override
protected void initEmbeddedActiveMQ() {
embeddedActiveMQ = new EmbeddedJMS();

View File

@ -20,22 +20,28 @@ import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.MessageServiceManager;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
import org.jboss.resteasy.spi.Registry;
public class RestMessagingBootstrapListener implements ServletContextListener {
public class RestMessagingBootstrapListener implements ServletContextListener, ConnectionFactoryOptions {
MessageServiceManager manager;
private String deserializationBlackList;
private String deserializationWhiteList;
@Override
public void contextInitialized(ServletContextEvent contextEvent) {
ServletContext context = contextEvent.getServletContext();
String configfile = context.getInitParameter("rest.messaging.config.file");
Registry registry = (Registry) context.getAttribute(Registry.class.getName());
if (registry == null) {
throw new RuntimeException("You must install RESTEasy as a Bootstrap Listener and it must be listed before this class");
}
manager = new MessageServiceManager();
String configfile = context.getInitParameter("rest.messaging.config.file");
deserializationBlackList = context.getInitParameter(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY);
deserializationWhiteList = context.getInitParameter(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY);
manager = new MessageServiceManager(this);
if (configfile != null) {
manager.setConfigResourcePath(configfile);
@ -56,4 +62,24 @@ public class RestMessagingBootstrapListener implements ServletContextListener {
manager.stop();
}
}
@Override
public String getDeserializationBlackList() {
return deserializationBlackList;
}
@Override
public void setDeserializationBlackList(String blackList) {
deserializationBlackList = blackList;
}
@Override
public String getDeserializationWhiteList() {
return deserializationWhiteList;
}
@Override
public void setDeserializationWhiteList(String whiteList) {
deserializationWhiteList = whiteList;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.rest.queue;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.HttpHeaderProperty;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
@ -51,13 +52,13 @@ public abstract class ConsumedMessage {
}
}
public static ConsumedMessage createConsumedMessage(ClientMessage message) {
public static ConsumedMessage createConsumedMessage(ClientMessage message, ConnectionFactoryOptions options) {
Boolean aBoolean = message.getBooleanProperty(POSTED_AS_HTTP_MESSAGE);
if (aBoolean != null && aBoolean.booleanValue()) {
return new ConsumedHttpMessage(message);
}
else if (message.getType() == Message.OBJECT_TYPE) {
return new ConsumedObjectMessage(message);
return new ConsumedObjectMessage(message, options);
}
else {
throw new IllegalArgumentException("ClientMessage must be an HTTP message or an Object message: " + message + " type: " + message.getType());

View File

@ -18,19 +18,22 @@ package org.apache.activemq.artemis.rest.queue;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
public class ConsumedObjectMessage extends ConsumedMessage {
protected Object readObject;
private ConnectionFactoryOptions options;
public ConsumedObjectMessage(ClientMessage message) {
public ConsumedObjectMessage(ClientMessage message, ConnectionFactoryOptions options) {
super(message);
if (message.getType() != Message.OBJECT_TYPE)
throw new IllegalArgumentException("Client message must be an OBJECT_TYPE");
this.options = options;
}
@Override
@ -43,7 +46,11 @@ public class ConsumedObjectMessage extends ConsumedMessage {
message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais = new ByteArrayInputStream(body);
try {
ObjectInputStream ois = new ObjectInputStream(bais);
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais);
if (options != null) {
ois.setWhiteList(options.getDeserializationWhiteList());
ois.setBlackList(options.getDeserializationBlackList());
}
readObject = ois.readObject();
}
catch (Exception e) {

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.util.LinkStrategy;
import org.apache.activemq.artemis.rest.util.TimeoutTask;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
@ -40,6 +41,12 @@ public abstract class DestinationServiceManager {
protected LinkStrategy linkStrategy;
protected BindingRegistry registry;
protected ConnectionFactoryOptions jmsOptions;
public DestinationServiceManager(ConnectionFactoryOptions jmsOptions) {
this.jmsOptions = jmsOptions;
}
public BindingRegistry getRegistry() {
return registry;
}
@ -157,4 +164,8 @@ public abstract class DestinationServiceManager {
public abstract void start() throws Exception;
public abstract void stop();
public ConnectionFactoryOptions getJmsOptions() {
return jmsOptions;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.rest.util.Constants;
import org.apache.activemq.artemis.rest.util.LinkStrategy;
@ -179,7 +180,7 @@ public class QueueConsumer {
return builder.build();
}
previousIndex = index;
lastConsumed = ConsumedMessage.createConsumedMessage(message);
lastConsumed = ConsumedMessage.createConsumedMessage(message, this.getJmsOptions());
String token = Long.toString(lastConsumed.getMessageID());
Response response = getMessageResponse(lastConsumed, info, basePath, token).build();
if (autoAck)
@ -187,7 +188,9 @@ public class QueueConsumer {
return response;
}
catch (Exception e) {
throw new RuntimeException(e);
Response errorResponse = Response.serverError().entity(e.getMessage())
.status(Response.Status.INTERNAL_SERVER_ERROR).build();
return errorResponse;
}
}
@ -264,4 +267,8 @@ public class QueueConsumer {
String uri = builder.build().toString();
serviceManager.getLinkStrategy().setLinkHeader(response, "consumer", "consumer", uri, MediaType.APPLICATION_XML);
}
public ConnectionFactoryOptions getJmsOptions() {
return serviceManager.getJmsOptions();
}
}

View File

@ -145,6 +145,7 @@ public class QueueDestinationsResource {
PushConsumerResource push = new PushConsumerResource();
push.setDestination(queueName);
push.setSessionFactory(manager.getConsumerSessionFactory());
push.setJmsOptions(manager.getJmsOptions());
queueResource.setPushConsumers(push);
PostMessage sender = null;

View File

@ -21,6 +21,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.PushStore;
import org.apache.activemq.artemis.rest.queue.push.FilePushStore;
@ -30,6 +31,10 @@ public class QueueServiceManager extends DestinationServiceManager {
protected List<QueueDeployment> queues = new ArrayList<>();
protected QueueDestinationsResource destination;
public QueueServiceManager(ConnectionFactoryOptions jmsOptions) {
super(jmsOptions);
}
public List<QueueDeployment> getQueues() {
return queues;
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.rest.queue.push.xml.PushRegistration;
import org.apache.activemq.artemis.utils.SelectorTranslator;
@ -38,16 +39,20 @@ public class PushConsumer {
protected PushStrategy strategy;
protected PushStore store;
private ConnectionFactoryOptions jmsOptions;
public PushConsumer(ClientSessionFactory factory,
String destination,
String id,
PushRegistration registration,
PushStore store) {
PushStore store,
ConnectionFactoryOptions jmsOptions) {
this.factory = factory;
this.destination = destination;
this.id = id;
this.registration = registration;
this.store = store;
this.jmsOptions = jmsOptions;
}
public PushStrategy getStrategy() {
@ -79,6 +84,7 @@ public class PushConsumer {
strategy = new UriStrategy();
}
strategy.setRegistration(registration);
strategy.setJmsOptions(jmsOptions);
strategy.start();
sessions = new ArrayList<>();

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.xml.PushRegistration;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
@ -45,6 +46,8 @@ public class PushConsumerResource {
protected final AtomicLong sessionCounter = new AtomicLong(1);
protected PushStore pushStore;
private ConnectionFactoryOptions jmsOptions;
public void start() {
}
@ -66,7 +69,7 @@ public class PushConsumerResource {
public void addRegistration(PushRegistration reg) throws Exception {
if (reg.isEnabled() == false)
return;
PushConsumer consumer = new PushConsumer(sessionFactory, destination, reg.getId(), reg, pushStore);
PushConsumer consumer = new PushConsumer(sessionFactory, destination, reg.getId(), reg, pushStore, jmsOptions);
consumer.start();
consumers.put(reg.getId(), consumer);
}
@ -80,7 +83,7 @@ public class PushConsumerResource {
String genId = sessionCounter.getAndIncrement() + "-" + startup;
registration.setId(genId);
registration.setDestination(destination);
PushConsumer consumer = new PushConsumer(sessionFactory, destination, genId, registration, pushStore);
PushConsumer consumer = new PushConsumer(sessionFactory, destination, genId, registration, pushStore, jmsOptions);
try {
consumer.start();
if (registration.isDurable() && pushStore != null) {
@ -142,4 +145,8 @@ public class PushConsumerResource {
public void setDestination(String destination) {
this.destination = destination;
}
public void setJmsOptions(ConnectionFactoryOptions jmsOptions) {
this.jmsOptions = jmsOptions;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.rest.queue.push;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.xml.PushRegistration;
public interface PushStrategy {
@ -36,4 +37,6 @@ public interface PushStrategy {
void start() throws Exception;
void stop() throws Exception;
void setJmsOptions(ConnectionFactoryOptions jmsOptions);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.rest.queue.push;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.xml.BasicAuth;
import org.apache.activemq.artemis.rest.queue.push.xml.PushRegistration;
import org.apache.activemq.artemis.rest.util.HttpMessageHelper;
@ -59,6 +60,8 @@ public class UriStrategy implements PushStrategy {
protected String method;
protected String contentType;
protected ConnectionFactoryOptions jmsOptions;
UriStrategy() {
connManager.setDefaultMaxPerRoute(100);
connManager.setMaxTotal(1000);
@ -105,6 +108,11 @@ public class UriStrategy implements PushStrategy {
connManager.shutdown();
}
@Override
public void setJmsOptions(ConnectionFactoryOptions jmsOptions) {
this.jmsOptions = jmsOptions;
}
@Override
public boolean push(ClientMessage message) {
ActiveMQRestLogger.LOGGER.debug("Pushing " + message);
@ -120,7 +128,7 @@ public class UriStrategy implements PushStrategy {
ActiveMQRestLogger.LOGGER.debug("Setting XmlHttpHeader: " + header.getName() + "=" + header.getValue());
request.header(header.getName(), header.getValue());
}
HttpMessageHelper.buildMessage(message, request, contentType);
HttpMessageHelper.buildMessage(message, request, contentType, jmsOptions);
ClientResponse<?> res = null;
try {
ActiveMQRestLogger.LOGGER.debug(method + " " + uri);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.rest.topic;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.PushStore;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.rest.queue.push.PushConsumer;
@ -30,8 +31,9 @@ public class PushSubscription extends PushConsumer {
String destination,
String id,
PushRegistration registration,
PushStore store) {
super(factory, destination, id, registration, store);
PushStore store,
ConnectionFactoryOptions jmsOptions) {
super(factory, destination, id, registration, store, jmsOptions);
}
@Override

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.push.PushConsumer;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
@ -47,6 +48,12 @@ public class PushSubscriptionsResource {
protected final AtomicLong sessionCounter = new AtomicLong(1);
protected TopicPushStore pushStore;
private ConnectionFactoryOptions jmsOptions;
public PushSubscriptionsResource(ConnectionFactoryOptions jmsOptions) {
this.jmsOptions = jmsOptions;
}
public void stop() {
for (PushConsumer consumer : consumers.values()) {
consumer.stop();
@ -92,7 +99,7 @@ public class PushSubscriptionsResource {
if (!query.isExists()) {
createSession = createSubscription(destination, reg.isDurable());
}
PushSubscription consumer = new PushSubscription(sessionFactory, reg.getDestination(), reg.getId(), reg, pushStore);
PushSubscription consumer = new PushSubscription(sessionFactory, reg.getDestination(), reg.getId(), reg, pushStore, jmsOptions);
try {
consumer.start();
}
@ -133,7 +140,7 @@ public class PushSubscriptionsResource {
registration.setTopic(destination);
ClientSession createSession = createSubscription(genId, registration.isDurable());
try {
PushSubscription consumer = new PushSubscription(sessionFactory, genId, genId, registration, pushStore);
PushSubscription consumer = new PushSubscription(sessionFactory, genId, genId, registration, pushStore, jmsOptions);
try {
consumer.start();
if (registration.isDurable() && pushStore != null) {

View File

@ -137,7 +137,7 @@ public class TopicDestinationsResource {
subscriptionsResource.setDestination(topicName);
subscriptionsResource.setSessionFactory(manager.getConsumerSessionFactory());
PushSubscriptionsResource push = new PushSubscriptionsResource();
PushSubscriptionsResource push = new PushSubscriptionsResource(manager.getJmsOptions());
push.setDestination(topicName);
push.setSessionFactory(manager.getConsumerSessionFactory());
topicResource.setPushSubscriptions(push);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.topic;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.queue.DestinationServiceManager;
import java.util.ArrayList;
@ -29,6 +30,10 @@ public class TopicServiceManager extends DestinationServiceManager {
protected List<TopicDeployment> topics = new ArrayList<>();
protected TopicDestinationsResource destination;
public TopicServiceManager(ConnectionFactoryOptions jmsOptions) {
super(jmsOptions);
}
public TopicPushStore getPushStore() {
return pushStore;
}

View File

@ -18,15 +18,15 @@ package org.apache.activemq.artemis.rest.util;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.rest.HttpHeaderProperty;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
import org.jboss.resteasy.client.ClientRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List;
import java.util.Map.Entry;
@ -39,39 +39,7 @@ public class HttpMessageHelper {
return lowerKey.toLowerCase().startsWith("content") || lowerKey.toLowerCase().equals("link");
}
public static void buildMessage(ClientMessage message, Response.ResponseBuilder builder) {
for (SimpleString key : message.getPropertyNames()) {
String k = key.toString();
String headerName = HttpHeaderProperty.fromPropertyName(k);
if (headerName == null) {
continue;
}
builder.header(headerName, message.getStringProperty(k));
}
int size = message.getBodySize();
if (size > 0) {
byte[] body = new byte[size];
message.getBodyBuffer().readBytes(body);
Boolean aBoolean = message.getBooleanProperty(POSTED_AS_HTTP_MESSAGE);
if (aBoolean != null && aBoolean.booleanValue()) {
builder.entity(body);
}
else {
ByteArrayInputStream bais = new ByteArrayInputStream(body);
Object obj = null;
try {
ObjectInputStream ois = new ObjectInputStream(bais);
obj = ois.readObject();
}
catch (Exception e) {
throw new RuntimeException(e);
}
builder.entity(obj);
}
}
}
public static void buildMessage(ClientMessage message, ClientRequest request, String contentType) {
public static void buildMessage(ClientMessage message, ClientRequest request, String contentType, ConnectionFactoryOptions jmsOptions) {
for (SimpleString key : message.getPropertyNames()) {
String k = key.toString();
String headerName = HttpHeaderProperty.fromPropertyName(k);
@ -105,12 +73,17 @@ public class HttpMessageHelper {
ByteArrayInputStream bais = new ByteArrayInputStream(body);
Object obj = null;
try {
ObjectInputStream ois = new ObjectInputStream(bais);
ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(bais);
if (jmsOptions != null) {
ois.setBlackList(jmsOptions.getDeserializationBlackList());
ois.setWhiteList(jmsOptions.getDeserializationWhiteList());
}
obj = ois.readObject();
ActiveMQRestLogger.LOGGER.debug("**** Building Message from object: " + obj.toString());
request.body(contentType, obj);
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

View File

@ -78,8 +78,10 @@ public class TimeoutTask implements Runnable {
public synchronized void stop() {
running = false;
if (thread != null) {
thread.interrupt();
}
}
public synchronized int getInterval() {
return interval;

View File

@ -29,7 +29,7 @@ import org.jboss.resteasy.test.TestPortProvider;
public class Embedded {
protected MessageServiceManager manager = new MessageServiceManager();
protected MessageServiceManager manager = new MessageServiceManager(null);
protected MessageServiceConfiguration config = new MessageServiceConfiguration();
protected ActiveMQServer activeMQServer;
protected TJWSEmbeddedJaxrsServer tjws = new TJWSEmbeddedJaxrsServer();

View File

@ -48,7 +48,7 @@ public class EmbeddedTest {
@BeforeClass
public static void startEmbedded() throws Exception {
server = new EmbeddedRestActiveMQJMS();
server = new EmbeddedRestActiveMQJMS(null);
server.getManager().setConfigResourcePath("activemq-rest.xml");
SecurityConfiguration securityConfiguration = new SecurityConfiguration();
securityConfiguration.addUser("guest", "guest");

View File

@ -52,7 +52,7 @@ public class PersistentPushQueueConsumerTest {
activeMQServer.start();
deployment = EmbeddedContainer.start();
manager = new MessageServiceManager();
manager = new MessageServiceManager(null);
manager.start();
deployment.getRegistry().addSingletonResource(manager.getQueueManager().getDestination());
deployment.getRegistry().addSingletonResource(manager.getTopicManager().getDestination());

View File

@ -72,7 +72,7 @@ public class PersistentPushTopicConsumerTest {
public static void startup() throws Exception {
deployment = EmbeddedContainer.start();
manager = new MessageServiceManager();
manager = new MessageServiceManager(null);
manager.start();
deployment.getRegistry().addSingletonResource(manager.getQueueManager().getDestination());
deployment.getRegistry().addSingletonResource(manager.getTopicManager().getDestination());

View File

@ -757,10 +757,24 @@ properties for their resource adapters. There are two properties that you can co
These properties, once specified, are eventually set on the corresponding internal factories.
### Specifying black list and white list for REST interface
Apache Artemis REST interface ([Rest](rest.md)) allows interactions between jms client and rest clients.
It uses JMS ObjectMessage to wrap the actual user data between the 2 types of clients and deserialization
is needed during this process. If you want to control the deserialization for REST, you need to set the
black/white lists for it separately as Apache Artemis REST Interface is deployed as a web application.
You need to put the black/white lists in its web.xml, as context parameters, as follows
<web-app>
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.whitelist</param-name>
<param-value>some.allowed.class</param-value>
</context-param>
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.blacklist</param-name>
<param-value>some.forbidden.class</param-value>
</context-param>
...
</web-app>
The param-value for each list is a comma separated string value representing the list.

View File

@ -349,6 +349,26 @@
<artifactId>artemis-test-support</artifactId>
<version>${project.version}</version>
</dependency>
<!-- rest test -->
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<version>${jetty.version}</version>
<type>jar</type>
<classifier>uber</classifier>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq.rest</groupId>
<artifactId>artemis-rest</artifactId>
<version>1.4.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
@ -356,9 +376,44 @@
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>**/rest/*.xml</exclude>
</excludes>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>process-test-classes</phase>
<id>bwlist-rest-test-war</id>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptor>src/test/resources/rest/bwlist-rest-test-asm.xml</descriptor>
<finalName>rest-test-bwlist</finalName>
<appendAssemblyId>false</appendAssemblyId>
<outputDirectory>target/test-classes/rest/</outputDirectory>
</configuration>
</execution>
<execution>
<phase>process-test-classes</phase>
<id>rest-test-war</id>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptor>src/test/resources/rest/rest-test-asm.xml</descriptor>
<finalName>rest-test</finalName>
<appendAssemblyId>false</appendAssemblyId>
<outputDirectory>target/test-classes/rest/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
@XmlRootElement(name = "order")
public class Order implements Serializable {
private static final long serialVersionUID = -3462346058107018735L;
private String name;
private String amount;
private String item;
public Order() {
}
public Order(String name, String amount, String item) {
assert (name != null);
assert (amount != null);
assert (item != null);
this.name = name;
this.amount = amount;
this.item = item;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
public String getItem() {
return item;
}
public void setItem(String item) {
this.item = item;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof Order)) {
return false;
}
Order order = (Order) other;
return name.equals(order.name) && amount.equals(order.amount) && item.equals(order.item);
}
@Override
public int hashCode() {
return name.hashCode() + amount.hashCode() + item.hashCode();
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.File;
import java.io.Serializable;
import java.io.StringReader;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.rest.HttpHeaderProperty;
import org.apache.activemq.artemis.tests.integration.rest.util.RestAMQConnection;
import org.apache.activemq.artemis.tests.integration.rest.util.RestMessageContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class RestDeserializationTest extends RestTestBase {
private RestAMQConnection restConnection;
@Before
public void setUp() throws Exception {
super.setUp();
createJettyServer("localhost", 12345);
}
@After
public void tearDown() throws Exception {
if (restConnection != null) {
restConnection.close();
}
super.tearDown();
}
@Test
public void testWithoutBlackWhiteListQueue() throws Exception {
deployAndconfigureRESTService("rest-test.war");
Order order = new Order();
order.setName("Bill");
order.setItem("iPhone4");
order.setAmount("$199.99");
jmsSendMessage(order, "orders", true);
String received = restReceiveQueueMessage("orders");
Object object = xmlToObject(received);
assertEquals(order, object);
}
@Test
public void testWithoutBlackWhiteListTopic() throws Exception {
deployAndconfigureRESTService("rest-test.war");
RestMessageContext topicContext = restConnection.createTopicContext("ordersTopic");
topicContext.initPullConsumers();
Order order = new Order();
order.setName("Bill");
order.setItem("iPhone4");
order.setAmount("$199.99");
jmsSendMessage(order, "ordersTopic", false);
String received = topicContext.pullMessage();
Object object = xmlToObject(received);
assertEquals(order, object);
}
@Test
public void testBlackWhiteListQueuePull() throws Exception {
deployAndconfigureRESTService("rest-test-bwlist.war");
Order order = new Order();
order.setName("Bill");
order.setItem("iPhone4");
order.setAmount("$199.99");
jmsSendMessage(order, "orders", true);
try {
String received = restReceiveQueueMessage("orders");
fail("Object should be rejected by blacklist, but " + received);
}
catch (IllegalStateException e) {
String error = e.getMessage();
assertTrue(error, error.contains("ClassNotFoundException"));
}
}
@Test
public void testBlackWhiteListTopicPull() throws Exception {
deployAndconfigureRESTService("rest-test-bwlist.war");
RestMessageContext topicContext = restConnection.createTopicContext("ordersTopic");
topicContext.initPullConsumers();
Order order = new Order();
order.setName("Bill");
order.setItem("iPhone4");
order.setAmount("$199.99");
jmsSendMessage(order, "ordersTopic", false);
try {
String received = topicContext.pullMessage();
fail("object should have been rejected but: " + received);
}
catch (IllegalStateException e) {
String error = e.getMessage();
assertTrue(error, error.contains("ClassNotFoundException"));
}
}
private void deployAndconfigureRESTService(String warFileName) throws Exception {
jmsServer.createTopic(false, "ordersTopic", (String[]) null);
File warFile = getResourceFile("/rest/" + warFileName, warFileName);
deployWebApp("/restapp", warFile);
server.start();
String uri = server.getURI().toASCIIString();
System.out.println("Sever started with uri: " + uri);
restConnection = new RestAMQConnection(uri);
}
private Object xmlToObject(String xmlString) throws JAXBException {
JAXBContext jc = JAXBContext.newInstance(Order.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
StringReader reader = new StringReader(xmlString);
return unmarshaller.unmarshal(reader);
}
private String restReceiveQueueMessage(String destName) throws Exception {
RestMessageContext restContext = restConnection.createQueueContext(destName);
String val = restContext.pullMessage();
return val;
}
private void jmsSendMessage(Serializable value, String destName, boolean isQueue) throws JMSException {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
String jmsDest;
if (isQueue) {
jmsDest = "jms.queue." + destName;
}
else {
jmsDest = "jms.topic." + destName;
}
Destination destination = ActiveMQDestination.fromAddress(jmsDest);
Connection conn = factory.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
ObjectMessage message = session.createObjectMessage();
message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, "application/xml");
message.setObject(value);
producer.send(message);
}
finally {
conn.close();
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.webapp.WebAppContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import shaded.org.apache.commons.io.FileUtils;
public class RestTestBase extends JMSTestBase {
@Rule
public TemporaryFolder testFolder = new TemporaryFolder();
protected Server server;
protected File webAppDir;
protected HandlerList handlers;
@Before
public void setUp() throws Exception {
super.setUp();
webAppDir = testFolder.newFolder("test-apps");
}
@After
public void tearDown() throws Exception {
if (server != null) {
try {
server.stop();
}
catch (Throwable t) {
t.printStackTrace();
}
}
super.tearDown();
}
public Server createJettyServer(String host, int port) throws Exception {
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.setConnectors(new Connector[]{connector});
handlers = new HandlerList();
server.setHandler(handlers);
return server;
}
public WebAppContext deployWebApp(String contextPath, File warFile) {
WebAppContext webapp = new WebAppContext();
if (contextPath.startsWith("/")) {
webapp.setContextPath(contextPath);
}
else {
webapp.setContextPath("/" + contextPath);
}
webapp.setWar(warFile.getAbsolutePath());
handlers.addHandler(webapp);
return webapp;
}
public File getResourceFile(String resPath, String warName) throws IOException {
InputStream input = RestTestBase.class.getResourceAsStream(resPath);
File result = new File(webAppDir, warName);
FileUtils.copyInputStreamToFile(input, result);
return result;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest.util;
import java.io.IOException;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
public class QueueRestMessageContext extends RestMessageContext {
public static final String PREFIX_QUEUE = "/queues/jms.queue.";
public QueueRestMessageContext(RestAMQConnection restAMQConnection, String queue) throws IOException {
super(restAMQConnection, queue);
}
@Override
protected String getDestLink() {
return PREFIX_QUEUE + destination;
}
@Override
protected String getPullConsumerUri() {
return getDestLink() + "/pull-consumers";
}
@Override
public void initPullConsumers() throws IOException {
String pullUri = getPullConsumerUri();
CloseableHttpResponse response = null;
if (!this.autoAck) {
response = connection.post(pullUri, "application/x-www-form-urlencoded", "autoAck=false");
}
else {
response = connection.post(pullUri);
}
try {
int code = ResponseUtil.getHttpCode(response);
if (code == 201) {
Header header = response.getFirstHeader("Location");
contextMap.put(KEY_PULL_CONSUMERS_LOC, header.getValue());
header = response.getFirstHeader(KEY_MSG_CONSUME_NEXT);
contextMap.put(KEY_MSG_CONSUME_NEXT, header.getValue());
header = response.getFirstHeader(KEY_MSG_ACK_NEXT);
if (header != null) {
contextMap.put(KEY_MSG_ACK_NEXT, header.getValue());
}
}
}
finally {
response.close();
}
}
@Override
protected String getPushLink(String pushTarget) {
return PREFIX_QUEUE + pushTarget;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest.util;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
public class ResponseUtil {
public static int getHttpCode(CloseableHttpResponse response) {
return response.getStatusLine().getStatusCode();
}
public static String getDetails(CloseableHttpResponse response) throws IOException {
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
public class RestAMQConnection implements Closeable {
private CloseableHttpClient httpClient = HttpClients.createDefault();
private String targetUri;
private List<RestMessageContext> contexts = new ArrayList<>();
public RestAMQConnection(String targetUri) {
this.targetUri = targetUri;
}
public QueueRestMessageContext createQueueContext(String queue) throws Exception {
QueueRestMessageContext ctx = new QueueRestMessageContext(this, queue);
contexts.add(ctx);
return ctx;
}
public TopicRestMessageContext createTopicContext(String topic) throws Exception {
TopicRestMessageContext ctx = new TopicRestMessageContext(this, topic, false);
contexts.add(ctx);
return ctx;
}
@Override
public void close() throws IOException {
for (RestMessageContext ctx : contexts) {
ctx.close();
}
httpClient.close();
}
private String getFullLink(String link) {
if (link.startsWith("http:")) {
return link;
}
return targetUri + link;
}
public CloseableHttpResponse request(String destLink) throws IOException {
String fullLink = getFullLink(destLink);
HttpGet request = new HttpGet(fullLink);
CloseableHttpResponse resp = httpClient.execute(request);
return resp;
}
public CloseableHttpResponse post(String uri, String contentType, String body) throws IOException {
String fullLink = getFullLink(uri);
HttpPost post = new HttpPost(fullLink);
StringEntity entity = new StringEntity(body,
ContentType.create(contentType));
post.setEntity(entity);
CloseableHttpResponse resp = httpClient.execute(post);
return resp;
}
public CloseableHttpResponse post(String uri) throws IOException {
String fullLink = getFullLink(uri);
HttpPost post = new HttpPost(fullLink);
CloseableHttpResponse resp = httpClient.execute(post);
return resp;
}
public void delete(String uri) throws IOException {
String consumerUri = getFullLink(uri);
HttpDelete delete = new HttpDelete(consumerUri);
CloseableHttpResponse resp = httpClient.execute(delete);
}
public String getTargetUri() {
return this.targetUri;
}
}

View File

@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
public abstract class RestMessageContext implements Closeable {
public static final String KEY_MSG_CREATE = "msg-create";
public static final String KEY_MSG_CREATE_ID = "msg-create-with-id";
public static final String KEY_MSG_PULL = "msg-pull-consumers";
public static final String KEY_MSG_PUSH = "msg-push-consumers";
public static final String KEY_MSG_PULL_SUB = "msg-pull-subscriptions";
public static final String KEY_MSG_PUSH_SUB = "msg-push-subscriptions";
public static final String KEY_MSG_CREATE_NEXT = "msg-create-next";
public static final String KEY_PULL_CONSUMERS_LOC = "pull-consumers-location";
public static final String KEY_MSG_CONSUME_NEXT = "msg-consume-next";
public static final String KEY_MSG_CONSUMER = "msg-consumer";
public static final String KEY_MSG_ACK_NEXT = "msg-acknowledge-next";
public static final String KEY_MSG_ACK = "msg-acknowledgement";
protected RestAMQConnection connection;
protected String destination;
protected Map<String, String> contextMap = new HashMap<>();
// consumer options
protected boolean autoAck;
protected boolean pushConsumer;
public RestMessageContext(RestAMQConnection restAMQConnection, String dest) throws IOException {
this(restAMQConnection, dest, true, false);
}
public RestMessageContext(RestAMQConnection restAMQConnection, String dest, boolean isAutoAck, boolean isPush) throws IOException {
this.connection = restAMQConnection;
this.destination = dest;
this.autoAck = isAutoAck;
this.pushConsumer = isPush;
prepareSelf();
}
private void prepareSelf() throws IOException {
String destLink = getDestLink();
CloseableHttpResponse response = connection.request(destLink);
int code = ResponseUtil.getHttpCode(response);
if (code != 200) {
System.out.println("failed to init " + destLink);
System.out.println("reason: " + ResponseUtil.getDetails(response));
}
try {
Header header = response.getFirstHeader(KEY_MSG_CREATE);
contextMap.put(KEY_MSG_CREATE, header.getValue());
header = response.getFirstHeader(KEY_MSG_CREATE_ID);
contextMap.put(KEY_MSG_CREATE_ID, header.getValue());
header = response.getFirstHeader(KEY_MSG_PULL);
if (header != null) {
contextMap.put(KEY_MSG_PULL, header.getValue());
}
header = response.getFirstHeader(KEY_MSG_PUSH);
if (header != null) {
contextMap.put(KEY_MSG_PUSH, header.getValue());
}
header = response.getFirstHeader(KEY_MSG_PULL_SUB);
if (header != null) {
contextMap.put(KEY_MSG_PULL_SUB, header.getValue());
}
header = response.getFirstHeader(KEY_MSG_PUSH_SUB);
if (header != null) {
contextMap.put(KEY_MSG_PUSH_SUB, header.getValue());
}
}
finally {
response.close();
}
}
protected abstract String getDestLink();
protected abstract String getPullConsumerUri();
protected abstract String getPushLink(String pushTarget);
public int postMessage(String content, String type) throws IOException {
String postUri;
String nextMsgUri = contextMap.get(KEY_MSG_CREATE_NEXT);
if (nextMsgUri == null) {
postUri = contextMap.get(KEY_MSG_CREATE);
}
else {
postUri = nextMsgUri;
}
CloseableHttpResponse response = connection.post(postUri, type, content);
int code = -1;
try {
code = ResponseUtil.getHttpCode(response);
// check redirection
if (code == 307) {
Header redirLoc = response.getFirstHeader("Location");
contextMap.put(KEY_MSG_CREATE_NEXT, redirLoc.getValue());
code = postMessage(content, type);// do it again.
}
else if (code == 201) {
Header header = response.getFirstHeader(KEY_MSG_CREATE_NEXT);
contextMap.put(KEY_MSG_CREATE_NEXT, header.getValue());
}
}
finally {
response.close();
}
return code;
}
public abstract void initPullConsumers() throws IOException;
public boolean acknowledgement(boolean ackValue) throws IOException {
String ackUri = contextMap.get(KEY_MSG_ACK);
if (ackUri != null) {
CloseableHttpResponse response = connection.post(ackUri, "application/x-www-form-urlencoded",
"acknowledge=" + ackValue);
int code = ResponseUtil.getHttpCode(response);
if (code == 200) {
contextMap.put(KEY_MSG_ACK_NEXT, response.getFirstHeader(KEY_MSG_ACK_NEXT).getValue());
}
return true;
}
return false;
}
public String pullMessage() throws IOException {
String message = null;
String msgPullUri = null;
if (autoAck) {
msgPullUri = contextMap.get(KEY_MSG_CONSUME_NEXT);
if (msgPullUri == null) {
initPullConsumers();
msgPullUri = contextMap.get(KEY_MSG_CONSUME_NEXT);
}
}
else {
msgPullUri = contextMap.get(KEY_MSG_ACK_NEXT);
if (msgPullUri == null) {
initPullConsumers();
msgPullUri = contextMap.get(KEY_MSG_ACK_NEXT);
}
}
CloseableHttpResponse response = connection.post(msgPullUri);
int code = ResponseUtil.getHttpCode(response);
try {
if (code == 200) {
// success
HttpEntity entity = response.getEntity();
long len = entity.getContentLength();
if (len != -1 && len < 1024000) {
message = EntityUtils.toString(entity);
}
else {
// drop message
System.err.println("Mesage too large, drop it " + len);
}
Header header = response.getFirstHeader(KEY_MSG_CONSUMER);
contextMap.put(KEY_MSG_CONSUMER, header.getValue());
if (!autoAck) {
header = response.getFirstHeader(KEY_MSG_ACK);
contextMap.put(KEY_MSG_ACK, header.getValue());
}
else {
header = response.getFirstHeader(KEY_MSG_CONSUME_NEXT);
contextMap.put(KEY_MSG_CONSUME_NEXT, header.getValue());
}
}
else if (code == 503) {
if (autoAck) {
contextMap.put(KEY_MSG_CONSUME_NEXT, response.getFirstHeader(KEY_MSG_CONSUME_NEXT).getValue());
}
else {
contextMap.put(KEY_MSG_ACK_NEXT, response.getFirstHeader(KEY_MSG_ACK_NEXT).getValue());
}
Header header = response.getFirstHeader("Retry-After");
if (header != null) {
long retryDelay = Long.valueOf(response.getFirstHeader("Retry-After").getValue());
try {
Thread.sleep(retryDelay);
}
catch (InterruptedException e) {
e.printStackTrace();
}
message = pullMessage();
}
}
else {
throw new IllegalStateException("error: " + ResponseUtil.getDetails(response));
}
}
finally {
response.close();
}
return message;
}
@Override
public void close() {
String consumerUri = contextMap.get(KEY_MSG_CONSUMER);
if (consumerUri != null) {
try {
connection.delete(consumerUri);
contextMap.remove(KEY_MSG_CONSUMER);
}
catch (ClientProtocolException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public void setUpPush(String pushTarget) throws Exception {
String pushLink = this.contextMap.get(KEY_MSG_PUSH);
String pushRegXml = "<push-registration>" +
"<link rel=\"destination\" href=\"" +
this.connection.getTargetUri() +
this.getPushLink(pushTarget) + "\"/>" +
"</push-registration>";
CloseableHttpResponse response = connection.post(pushLink, "application/xml", pushRegXml);
int code = ResponseUtil.getHttpCode(response);
try {
if (code != 201) {
System.out.println("Failed to push " + pushRegXml);
System.out.println("Location: " + pushLink);
throw new Exception("Failed to register push " + ResponseUtil.getDetails(response));
}
}
finally {
response.close();
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.rest.util;
import java.io.IOException;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
public class TopicRestMessageContext extends RestMessageContext {
public static final String PREFIX_TOPIC = "/topics/jms.topic.";
private boolean durableSub;
public TopicRestMessageContext(RestAMQConnection restAMQConnection, String topic, boolean durable) throws IOException {
super(restAMQConnection, topic);
this.durableSub = durable;
}
@Override
protected String getDestLink() {
return PREFIX_TOPIC + destination;
}
@Override
protected String getPullConsumerUri() {
return getDestLink() + "/pull-subscriptions";
}
@Override
public void initPullConsumers() throws IOException {
String pullUri = getPullConsumerUri();
CloseableHttpResponse response = null;
if (this.durableSub || !this.autoAck) {
String extraOpt = "durable=" + this.durableSub + "&autoAck=" + this.autoAck;
response = connection.post(pullUri, "application/x-www-form-urlencoded", extraOpt);
}
else {
response = connection.post(pullUri);
}
int code = ResponseUtil.getHttpCode(response);
try {
if (code == 201) {
Header header = response.getFirstHeader("Location");
contextMap.put(KEY_PULL_CONSUMERS_LOC, header.getValue());
header = response.getFirstHeader(KEY_MSG_CONSUME_NEXT);
contextMap.put(KEY_MSG_CONSUME_NEXT, header.getValue());
header = response.getFirstHeader(KEY_MSG_ACK_NEXT);
if (header != null) {
contextMap.put(KEY_MSG_ACK_NEXT, header.getValue());
}
}
else {
throw new IllegalStateException("Failed to init pull consumer " + ResponseUtil.getDetails(response));
}
}
finally {
response.close();
}
}
@Override
protected String getPushLink(String pushTarget) {
return PREFIX_TOPIC + pushTarget;
}
}

View File

@ -0,0 +1,37 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>rest-test-bwlist</id>
<formats>
<format>war</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>../../artemis-rest/target/classes/</directory>
<outputDirectory>/WEB-INF/classes</outputDirectory>
</fileSet>
<fileSet>
<directory>src/test/resources/rest/rest-test-bwlist/webapp/</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -0,0 +1,37 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>rest-test</id>
<formats>
<format>war</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>../../artemis-rest/target/classes/</directory>
<outputDirectory>/WEB-INF/classes</outputDirectory>
</fileSet>
<fileSet>
<directory>src/test/resources/rest/rest-test/webapp/</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -0,0 +1,56 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd">
<web-app>
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.whitelist</param-name>
<param-value>some.other.package</param-value>
</context-param>
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.blacklist</param-name>
<param-value>org.apache.activemq.artemis.example.rest.Order</param-value>
</context-param>
<filter>
<filter-name>Rest-Messaging</filter-name>
<filter-class>
org.jboss.resteasy.plugins.server.servlet.FilterDispatcher
</filter-class>
</filter>
<filter-mapping>
<filter-name>Rest-Messaging</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<listener>
<listener-class>org.jboss.resteasy.plugins.server.servlet.ResteasyBootstrap</listener-class>
</listener>
<listener>
<listener-class>org.apache.activemq.artemis.rest.integration.RestMessagingBootstrapListener</listener-class>
</listener>
</web-app>

View File

@ -0,0 +1,58 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd">
<web-app>
<!--
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.whitelist</param-name>
<param-value>some.other.package</param-value>
</context-param>
<context-param>
<param-name>org.apache.activemq.artemis.jms.deserialization.blacklist</param-name>
<param-value>org.apache.activemq.artemis.example.rest.Order</param-value>
</context-param>
-->
<filter>
<filter-name>Rest-Messaging</filter-name>
<filter-class>
org.jboss.resteasy.plugins.server.servlet.FilterDispatcher
</filter-class>
</filter>
<filter-mapping>
<filter-name>Rest-Messaging</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<listener>
<listener-class>org.jboss.resteasy.plugins.server.servlet.ResteasyBootstrap</listener-class>
</listener>
<listener>
<listener-class>org.apache.activemq.artemis.rest.integration.RestMessagingBootstrapListener</listener-class>
</listener>
</web-app>