diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java new file mode 100644 index 0000000000..fd7d5f8f4f --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQClientResource.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.io.Serializable; +import java.net.URI; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractActiveMQClientResource extends ExternalResource { + Logger log = LoggerFactory.getLogger(this.getClass()); + + ActiveMQConnectionFactory connectionFactory; + Connection connection; + Session session; + ActiveMQDestination destination; + + public AbstractActiveMQClientResource(ActiveMQConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public AbstractActiveMQClientResource(URI brokerURI) { + this(new ActiveMQConnectionFactory(brokerURI)); + } + + public AbstractActiveMQClientResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) { + this(embeddedActiveMQBroker.createConnectionFactory()); + } + + public AbstractActiveMQClientResource(URI brokerURI, String userName, String password) { + this(new ActiveMQConnectionFactory(userName, password, brokerURI)); + } + + public AbstractActiveMQClientResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + this(connectionFactory); + destination = createDestination(destinationName); + } + + public AbstractActiveMQClientResource(String destinationName, URI brokerURI) { + this(destinationName, new ActiveMQConnectionFactory(brokerURI)); + } + + public AbstractActiveMQClientResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + this(destinationName, embeddedActiveMQBroker.createConnectionFactory()); + } + + public AbstractActiveMQClientResource(String destinationName, URI brokerURI, String userName, String password) { + this(destinationName, new ActiveMQConnectionFactory(userName, password, brokerURI)); + } + + public static void setMessageProperties(Message message, Map properties) throws JMSException { + if (properties != null) { + for (Map.Entry property : properties.entrySet()) { + message.setObjectProperty(property.getKey(), property.getValue()); + } + } + } + + public String getClientId() { + return null; + } + + public String getDestinationName() { + return (destination != null) ? destination.toString() : null; + } + + public abstract byte getDestinationType(); + + protected abstract void createClient() throws JMSException; + + /** + * Start the Client + *

+ * Invoked by JUnit to setup the resource + */ + @Override + protected void before() throws Throwable { + log.info("Starting {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL()); + + this.start(); + + super.before(); + } + + /** + * Stop the Client + *

+ * Invoked by JUnit to tear down the resource + */ + @Override + protected void after() { + log.info("Stopping {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL()); + + super.after(); + + this.stop(); + } + + public void start() { + try { + try { + connection = connectionFactory.createConnection(); + String clientId = getClientId(); + if (clientId != null) { + connection.setClientID(clientId); + } + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + createClient(); + } catch (JMSException jmsEx) { + throw new RuntimeException("Producer initialization failed" + this.getClass().getSimpleName(), jmsEx); + } + connection.start(); + } catch (JMSException jmsEx) { + throw new IllegalStateException("Producer failed to start", jmsEx); + } + log.info("Ready to produce messages to {}", connectionFactory.getBrokerURL()); + } + + public void stop() { + try { + connection.close(); + } catch (JMSException jmsEx) { + log.warn("Exception encountered closing JMS Connection", jmsEx); + } + } + + public String getBrokerURL() { + return connectionFactory.getBrokerURL(); + } + + protected ActiveMQDestination createDestination(String destinationName) { + if (destinationName != null) { + return ActiveMQDestination.createDestination(destinationName, getDestinationType()); + } + + return null; + } + + public BytesMessage createBytesMessage() throws JMSException { + return session.createBytesMessage(); + } + + public TextMessage createTextMessage() throws JMSException { + return session.createTextMessage(); + } + + public MapMessage createMapMessage() throws JMSException { + return session.createMapMessage(); + } + + public ObjectMessage createObjectMessage() throws JMSException { + return session.createObjectMessage(); + } + + public StreamMessage createStreamMessage() throws JMSException { + return session.createStreamMessage(); + } + + public BytesMessage createMessage(byte[] body) throws JMSException { + return this.createMessage(body, null); + } + + public TextMessage createMessage(String body) throws JMSException { + return this.createMessage(body, null); + } + + public MapMessage createMessage(Map body) throws JMSException { + return this.createMessage(body, null); + } + + public ObjectMessage createMessage(Serializable body) throws JMSException { + return this.createMessage(body, null); + } + + public BytesMessage createMessage(byte[] body, Map properties) throws JMSException { + BytesMessage message = this.createBytesMessage(); + if (body != null) { + message.writeBytes(body); + } + + setMessageProperties(message, properties); + + return message; + } + + public TextMessage createMessage(String body, Map properties) throws JMSException { + TextMessage message = this.createTextMessage(); + if (body != null) { + message.setText(body); + } + + setMessageProperties(message, properties); + + return message; + } + + public MapMessage createMessage(Map body, Map properties) throws JMSException { + MapMessage message = this.createMapMessage(); + + if (body != null) { + for (Map.Entry entry : body.entrySet()) { + message.setObject(entry.getKey(), entry.getValue()); + } + } + + setMessageProperties(message, properties); + + return message; + } + + public ObjectMessage createMessage(Serializable body, Map properties) throws JMSException { + ObjectMessage message = this.createObjectMessage(); + + if (body != null) { + message.setObject(body); + } + + setMessageProperties(message, properties); + + return message; + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java new file mode 100644 index 0000000000..5b04171138 --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQConsumerResource.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; + +public abstract class AbstractActiveMQConsumerResource extends AbstractActiveMQClientResource { + MessageConsumer consumer; + long defaultReceiveTimout = 50; + + public AbstractActiveMQConsumerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public AbstractActiveMQConsumerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + public long getDefaultReceiveTimout() { + return defaultReceiveTimout; + } + + public void setDefaultReceiveTimout(long defaultReceiveTimout) { + this.defaultReceiveTimout = defaultReceiveTimout; + } + + @Override + protected void createClient() throws JMSException { + consumer = session.createConsumer(destination); + } + + public BytesMessage receiveBytesMessage() throws JMSException { + return (BytesMessage) this.receiveMessage(); + } + + public TextMessage receiveTextMessage() throws JMSException { + return (TextMessage) this.receiveMessage(); + } + + public MapMessage receiveMapMessage() throws JMSException { + return (MapMessage) this.receiveMessage(); + } + + public ObjectMessage receiveObjectMessage() throws JMSException { + return (ObjectMessage) this.receiveMessage(); + } + + public BytesMessage receiveBytesMessage(long timeout) throws JMSException { + return (BytesMessage) this.receiveMessage(timeout); + } + + public TextMessage receiveTextMessage(long timeout) throws JMSException { + return (TextMessage) this.receiveMessage(timeout); + } + + public MapMessage receiveMapMessage(long timeout) throws JMSException { + return (MapMessage) this.receiveMessage(timeout); + } + + public ObjectMessage receiveObjectMessage(long timeout) throws JMSException { + return (ObjectMessage) this.receiveMessage(timeout); + } + + public Message receiveMessage() throws JMSException { + return receiveMessage(defaultReceiveTimout); + } + + /** + * Receive a message with the given timeout + * + * @param timeout + * @return + * @throws JMSException + */ + public Message receiveMessage(long timeout) throws JMSException { + Message message = null; + if (timeout > 0) { + message = consumer.receive(timeout); + } else if (timeout == 0) { + message = consumer.receiveNoWait(); + } else { + message = consumer.receive(); + } + + return message; + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java new file mode 100644 index 0000000000..69e89af8bc --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/AbstractActiveMQProducerResource.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.io.Serializable; +import java.net.URI; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; + +public abstract class AbstractActiveMQProducerResource extends AbstractActiveMQClientResource { + MessageProducer producer; + + public AbstractActiveMQProducerResource(ActiveMQConnectionFactory connectionFactory) { + super(connectionFactory); + } + + public AbstractActiveMQProducerResource(URI brokerURI) { + super(brokerURI); + } + + public AbstractActiveMQProducerResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(embeddedActiveMQBroker); + } + + public AbstractActiveMQProducerResource(URI brokerURI, String userName, String password) { + super(brokerURI, userName, password); + } + + public AbstractActiveMQProducerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public AbstractActiveMQProducerResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public AbstractActiveMQProducerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public AbstractActiveMQProducerResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public String getDestinationName() { + try { + if (producer != null && producer.getDestination() != null) { + return producer.getDestination().toString(); + } + } catch (JMSException e) { + // eat this + } + + return null; + } + + public void sendMessage(Message message) throws JMSException { + producer.send(message); + } + + public BytesMessage sendMessage(byte[] body) throws JMSException { + BytesMessage message = this.createMessage(body); + sendMessage(message); + return message; + } + + public TextMessage sendMessage(String body) throws JMSException { + TextMessage message = this.createMessage(body); + sendMessage(message); + return message; + } + + public MapMessage sendMessage(Map body) throws JMSException { + MapMessage message = this.createMessage(body); + sendMessage(message); + return message; + } + + public ObjectMessage sendMessage(Serializable body) throws JMSException { + ObjectMessage message = this.createMessage(body); + sendMessage(message); + return message; + } + + public BytesMessage sendMessageWithProperties(byte[] body, Map properties) throws JMSException { + BytesMessage message = this.createMessage(body, properties); + sendMessage(message); + return message; + } + + public TextMessage sendMessageWithProperties(String body, Map properties) throws JMSException { + TextMessage message = this.createMessage(body, properties); + sendMessage(message); + return message; + } + + public MapMessage sendMessageWithProperties(Map body, Map properties) throws JMSException { + MapMessage message = this.createMessage(body, properties); + sendMessage(message); + return message; + } + + public ObjectMessage sendMessageWithProperties(Serializable body, Map properties) throws JMSException { + ObjectMessage message = this.createMessage(body, properties); + sendMessage(message); + return message; + } + +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java new file mode 100644 index 0000000000..1e355aed04 --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicQueueSenderResource.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.io.Serializable; +import java.net.URI; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQDynamicQueueSenderResource extends AbstractActiveMQProducerResource { + public ActiveMQDynamicQueueSenderResource(ActiveMQConnectionFactory connectionFactory) { + super(connectionFactory); + } + + public ActiveMQDynamicQueueSenderResource(URI brokerURI) { + super(brokerURI); + } + + public ActiveMQDynamicQueueSenderResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(embeddedActiveMQBroker); + } + + public ActiveMQDynamicQueueSenderResource(URI brokerURI, String userName, String password) { + super(brokerURI, userName, password); + } + + public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) { + super(defaultDestinationName, connectionFactory); + } + + public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI) { + super(defaultDestinationName, brokerURI); + } + + public ActiveMQDynamicQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI, String userName, String password) { + super(defaultDestinationName, brokerURI, userName, password); + } + + @Override + protected void createClient() throws JMSException { + producer = session.createProducer(null); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.QUEUE_TYPE; + } + + @Override + public void sendMessage(Message message) throws JMSException { + if (destination == null) { + throw new IllegalStateException("Destination is not specified"); + } + + producer.send(destination, message); + } + + public void sendMessage(String destinationName, Message message) throws JMSException { + producer.send(createDestination(destinationName), message); + } + + public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException { + BytesMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public TextMessage sendMessage(String destinationName, String body) throws JMSException { + TextMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public MapMessage sendMessage(String destinationName, Map body) throws JMSException { + MapMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException { + ObjectMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map properties) throws JMSException { + BytesMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public TextMessage sendMessageWithProperties(String destinationName, String body, Map properties) throws JMSException { + TextMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public MapMessage sendMessageWithProperties(String destinationName, Map body, Map properties) throws JMSException { + MapMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map properties) throws JMSException { + ObjectMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java new file mode 100644 index 0000000000..8181946923 --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQDynamicTopicPublisherResource.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.io.Serializable; +import java.net.URI; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQDynamicTopicPublisherResource extends AbstractActiveMQProducerResource { + public ActiveMQDynamicTopicPublisherResource(ActiveMQConnectionFactory connectionFactory) { + super(connectionFactory); + } + + public ActiveMQDynamicTopicPublisherResource(URI brokerURI) { + super(brokerURI); + } + + public ActiveMQDynamicTopicPublisherResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(embeddedActiveMQBroker); + } + + public ActiveMQDynamicTopicPublisherResource(URI brokerURI, String userName, String password) { + super(brokerURI, userName, password); + } + + public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) { + super(defaultDestinationName, connectionFactory); + } + + public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI) { + super(defaultDestinationName, brokerURI); + } + + public ActiveMQDynamicTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI, String userName, String password) { + super(defaultDestinationName, brokerURI, userName, password); + } + + @Override + protected void createClient() throws JMSException { + producer = session.createProducer(null); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.TOPIC_TYPE; + } + + @Override + public void sendMessage(Message message) throws JMSException { + if (destination == null) { + throw new IllegalStateException("Destination is not specified"); + } + + producer.send(destination, message); + } + + public void sendMessage(String destinationName, Message message) throws JMSException { + producer.send(createDestination(destinationName), message); + } + + public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException { + BytesMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public TextMessage sendMessage(String destinationName, String body) throws JMSException { + TextMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public MapMessage sendMessage(String destinationName, Map body) throws JMSException { + MapMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException { + ObjectMessage message = this.createMessage(body); + sendMessage(destinationName, message); + return message; + } + + public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map properties) throws JMSException { + BytesMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public TextMessage sendMessageWithProperties(String destinationName, String body, Map properties) throws JMSException { + TextMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public MapMessage sendMessageWithProperties(String destinationName, Map body, Map properties) throws JMSException { + MapMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } + + public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map properties) throws JMSException { + ObjectMessage message = this.createMessage(body, properties); + sendMessage(destinationName, message); + return message; + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java new file mode 100644 index 0000000000..a9748365a2 --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueReceiverResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQQueueReceiverResource extends AbstractActiveMQConsumerResource { + public ActiveMQQueueReceiverResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public ActiveMQQueueReceiverResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.QUEUE_TYPE; + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java new file mode 100644 index 0000000000..fa3cdcaf1c --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQQueueSenderResource.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; +import javax.jms.JMSException; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQQueueSenderResource extends AbstractActiveMQProducerResource { + public ActiveMQQueueSenderResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public ActiveMQQueueSenderResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public ActiveMQQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQQueueSenderResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.QUEUE_TYPE; + } + + @Override + protected void createClient() throws JMSException { + producer = session.createProducer(destination); + } + +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java new file mode 100644 index 0000000000..58bb6a52f6 --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicDurableSubscriberResource.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; +import javax.jms.JMSException; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQTopicDurableSubscriberResource extends AbstractActiveMQConsumerResource { + String clientId = "test-client-id"; + String subscriberName = "test-subscriber"; + + public ActiveMQTopicDurableSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public ActiveMQTopicDurableSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.TOPIC_TYPE; + } + + @Override + protected void createClient() throws JMSException { + consumer = session.createDurableSubscriber((Topic) destination, subscriberName); + } + + @Override + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getSubscriberName() { + return subscriberName; + } + + public void setSubscriberName(String subscriberName) { + this.subscriberName = subscriberName; + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java new file mode 100644 index 0000000000..0def0e9d0b --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicPublisherResource.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; +import javax.jms.JMSException; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQTopicPublisherResource extends AbstractActiveMQProducerResource { + public ActiveMQTopicPublisherResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public ActiveMQTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public String getDestinationName() { + try { + if (producer != null && producer.getDestination() != null) { + return producer.getDestination().toString(); + } + } catch (JMSException e) { + // eat this + } + + return null; + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.TOPIC_TYPE; + } + + @Override + protected void createClient() throws JMSException { + producer = session.createProducer(destination); + } +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java new file mode 100644 index 0000000000..547ff1146c --- /dev/null +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/ActiveMQTopicSubscriberResource.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.junit; + +import java.net.URI; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class ActiveMQTopicSubscriberResource extends AbstractActiveMQConsumerResource { + public ActiveMQTopicSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) { + super(destinationName, connectionFactory); + } + + public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI) { + super(destinationName, brokerURI); + } + + public ActiveMQTopicSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) { + super(destinationName, embeddedActiveMQBroker); + } + + public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI, String userName, String password) { + super(destinationName, brokerURI, userName, password); + } + + @Override + public byte getDestinationType() { + return ActiveMQDestination.TOPIC_TYPE; + } + +} diff --git a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java index 3e328e8483..d1877ba445 100644 --- a/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java +++ b/activemq-tooling/activemq-junit/src/main/java/org/apache/activemq/junit/EmbeddedActiveMQBroker.java @@ -16,23 +16,37 @@ */ package org.apache.activemq.junit; +import java.io.Serializable; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.plugin.StatisticsBrokerPlugin; import org.apache.activemq.pool.PooledConnectionFactory; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE; + /** * A JUnit Rule that embeds an ActiveMQ broker into a test. */ @@ -40,15 +54,15 @@ public class EmbeddedActiveMQBroker extends ExternalResource { Logger log = LoggerFactory.getLogger(this.getClass()); BrokerService brokerService; + InternalClient internalClient; /** * Create an embedded ActiveMQ broker using defaults - * + *

* The defaults are: - * - the broker name is 'embedded-broker' - * - JMX is disabled - * - Persistence is disabled - * + * - the broker name is 'embedded-broker' + * - JMX is disabled + * - Persistence is disabled */ public EmbeddedActiveMQBroker() { brokerService = new BrokerService(); @@ -61,7 +75,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource { /** * Create an embedded ActiveMQ broker using a configuration URI */ - public EmbeddedActiveMQBroker(String configurationURI ) { + public EmbeddedActiveMQBroker(String configurationURI) { try { brokerService = BrokerFactory.createBroker(configurationURI); } catch (Exception ex) { @@ -72,7 +86,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource { /** * Create an embedded ActiveMQ broker using a configuration URI */ - public EmbeddedActiveMQBroker(URI configurationURI ) { + public EmbeddedActiveMQBroker(URI configurationURI) { try { brokerService = BrokerFactory.createBroker(configurationURI); } catch (Exception ex) { @@ -80,13 +94,26 @@ public class EmbeddedActiveMQBroker extends ExternalResource { } } + public static void setMessageProperties(Message message, Map properties) { + if (properties != null && properties.size() > 0) { + for (Map.Entry property : properties.entrySet()) { + try { + message.setObjectProperty(property.getKey(), property.getValue()); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx); + } + } + } + } + /** * Customize the configuration of the embedded ActiveMQ broker - * + *

* This method is called before the embedded ActiveMQ broker is started, and can * be overridden to this method to customize the broker configuration. */ - protected void configure() {} + protected void configure() { + } /** * Start the embedded ActiveMQ broker, blocking until the broker has successfully started. @@ -98,6 +125,8 @@ public class EmbeddedActiveMQBroker extends ExternalResource { try { this.configure(); brokerService.start(); + internalClient = new InternalClient(); + internalClient.start(); } catch (Exception ex) { throw new RuntimeException("Exception encountered starting embedded ActiveMQ broker: {}" + this.getBrokerName(), ex); } @@ -112,6 +141,10 @@ public class EmbeddedActiveMQBroker extends ExternalResource { * be stopped manually to support advanced testing scenarios. */ public void stop() { + if (internalClient != null) { + internalClient.stop(); + internalClient = null; + } if (!brokerService.isStopped()) { try { brokerService.stop(); @@ -158,7 +191,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource { */ public ActiveMQConnectionFactory createConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL(brokerService.getVmConnectorURI().toString()); + connectionFactory.setBrokerURL(getVmURL()); return connectionFactory; } @@ -187,15 +220,64 @@ public class EmbeddedActiveMQBroker extends ExternalResource { } /** - * Get the VM URL for the embedded ActiveMQ Broker + * Get the failover VM URL for the embedded ActiveMQ Broker *

- * NOTE: The option is precreate=false option is appended to the URL to avoid the automatic creation of brokers + * NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers * and the resulting duplicate broker errors * * @return the VM URL for the embedded broker */ public String getVmURL() { - return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString()); + return getVmURL(true); + } + + /** + * Get the VM URL for the embedded ActiveMQ Broker + *

+ * NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers + * and the resulting duplicate broker errors + * + * @param failoverURL if true a failover URL will be returned + * @return the VM URL for the embedded broker + */ + public String getVmURL(boolean failoverURL) { + if (failoverURL) { + return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString()); + } + + return brokerService.getVmConnectorURI().toString() + "?create=false"; + } + + /** + * Get the failover VM URI for the embedded ActiveMQ Broker + *

+ * NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers + * and the resulting duplicate broker errors + * + * @return the VM URI for the embedded broker + */ + public URI getVmURI() { + return getVmURI(true); + } + + /** + * Get the VM URI for the embedded ActiveMQ Broker + *

+ * NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers + * and the resulting duplicate broker errors + * + * @param failoverURI if true a failover URI will be returned + * @return the VM URI for the embedded broker + */ + public URI getVmURI(boolean failoverURI) { + URI result; + try { + result = new URI(getVmURL(failoverURI)); + } catch (URISyntaxException uriEx) { + throw new RuntimeException("Unable to create failover URI", uriEx); + } + + return result; } /** @@ -326,64 +408,53 @@ public class EmbeddedActiveMQBroker extends ExternalResource { /** * Get the number of messages in a specific JMS Destination. *

- * The full name of the JMS destination including the prefix should be provided - i.e. queue:myQueue - * or topic:myTopic. If the destination type prefix is not included in the destination name, a prefix - * of "queue:" is assumed. + * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue + * or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix + * of "queue://" is assumed. * - * @param fullDestinationName the full name of the JMS Destination + * @param destinationName the full name of the JMS Destination * @return the number of messages in the JMS Destination */ - public int getMessageCount(String fullDestinationName) throws Exception { - final int QUEUE_TYPE = 1; - final int TOPIC_TYPE = 2; - + public long getMessageCount(String destinationName) { if (null == brokerService) { throw new IllegalStateException("BrokerService has not yet been created - was before() called?"); } - int destinationType = QUEUE_TYPE; - String destinationName = fullDestinationName; - - if (fullDestinationName.startsWith("queue:")) { - destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1); - } else if (fullDestinationName.startsWith("topic:")) { - destinationType = TOPIC_TYPE; - destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1); + // TODO: Figure out how to do this for Topics + Destination destination = getDestination(destinationName); + if (destination == null) { + throw new RuntimeException("Failed to find destination: " + destinationName); } - int messageCount = -1; - boolean foundDestination = false; - for (Destination destination : brokerService.getBroker().getDestinationMap().values()) { - String tmpName = destination.getName(); - if (tmpName.equalsIgnoreCase(destinationName)) { - switch (destinationType) { - case QUEUE_TYPE: - if (destination instanceof Queue) { - messageCount = destination.getMessageStore().getMessageCount(); - foundDestination = true; - } - break; - case TOPIC_TYPE: - if (destination instanceof Topic) { - messageCount = destination.getMessageStore().getMessageCount(); - foundDestination = true; - } - break; - default: - // Should never see this - log.error("Type didn't match: {}", destination.getClass().getName()); - } - } - if (foundDestination) { - break; - } + // return destination.getMessageStore().getMessageCount(); + return destination.getDestinationStatistics().getMessages().getCount(); + } + + /** + * Get the ActiveMQ destination + *

+ * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue + * or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix + * of "queue://" is assumed. + * + * @param destinationName the full name of the JMS Destination + * @return the ActiveMQ destination, null if not found + */ + public Destination getDestination(String destinationName) { + if (null == brokerService) { + throw new IllegalStateException("BrokerService has not yet been created - was before() called?"); } - if (!foundDestination) { - log.warn("Didn't find destination {} in broker {}", fullDestinationName, getBrokerName()); + Destination destination = null; + try { + destination = brokerService.getDestination(ActiveMQDestination.createDestination(destinationName, QUEUE_TYPE)); + } catch (RuntimeException runtimeEx) { + throw runtimeEx; + } catch (Exception ex) { + throw new EmbeddedActiveMQBrokerException("Unexpected exception getting destination from broker", ex); } - return messageCount; + return destination; } private PolicyEntry getDefaultPolicyEntry() { @@ -401,4 +472,323 @@ public class EmbeddedActiveMQBroker extends ExternalResource { return defaultEntry; } + + public BytesMessage createBytesMessage() { + return internalClient.createBytesMessage(); + } + + public TextMessage createTextMessage() { + return internalClient.createTextMessage(); + } + + public MapMessage createMapMessage() { + return internalClient.createMapMessage(); + } + + public ObjectMessage createObjectMessage() { + return internalClient.createObjectMessage(); + } + + public StreamMessage createStreamMessage() { + return internalClient.createStreamMessage(); + } + + public BytesMessage createMessage(byte[] body) { + return this.createMessage(body, null); + } + + public TextMessage createMessage(String body) { + return this.createMessage(body, null); + } + + public MapMessage createMessage(Map body) { + return this.createMessage(body, null); + } + + public ObjectMessage createMessage(Serializable body) { + return this.createMessage(body, null); + } + + public BytesMessage createMessage(byte[] body, Map properties) { + BytesMessage message = this.createBytesMessage(); + if (body != null) { + try { + message.writeBytes(body); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx); + } + } + + setMessageProperties(message, properties); + + return message; + } + + public TextMessage createMessage(String body, Map properties) { + TextMessage message = this.createTextMessage(); + if (body != null) { + try { + message.setText(body); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx); + } + } + + setMessageProperties(message, properties); + + return message; + } + + public MapMessage createMessage(Map body, Map properties) { + MapMessage message = this.createMapMessage(); + + if (body != null) { + for (Map.Entry entry : body.entrySet()) { + try { + message.setObject(entry.getKey(), entry.getValue()); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx); + } + } + } + + setMessageProperties(message, properties); + + return message; + } + + public ObjectMessage createMessage(Serializable body, Map properties) { + ObjectMessage message = this.createObjectMessage(); + + if (body != null) { + try { + message.setObject(body); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx); + } + } + + setMessageProperties(message, properties); + + return message; + } + + public void pushMessage(String destinationName, Message message) { + if (destinationName == null) { + throw new IllegalArgumentException("pushMessage failure - destination name is required"); + } else if (message == null) { + throw new IllegalArgumentException("pushMessage failure - a Message is required"); + } + ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + + internalClient.pushMessage(destination, message); + } + + public BytesMessage pushMessage(String destinationName, byte[] body) { + BytesMessage message = createMessage(body, null); + pushMessage(destinationName, message); + return message; + } + + public TextMessage pushMessage(String destinationName, String body) { + TextMessage message = createMessage(body, null); + pushMessage(destinationName, message); + return message; + } + + public MapMessage pushMessage(String destinationName, Map body) { + MapMessage message = createMessage(body, null); + pushMessage(destinationName, message); + return message; + } + + public ObjectMessage pushMessage(String destinationName, Serializable body) { + ObjectMessage message = createMessage(body, null); + pushMessage(destinationName, message); + return message; + } + + public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map properties) { + BytesMessage message = createMessage(body, properties); + pushMessage(destinationName, message); + return message; + } + + public TextMessage pushMessageWithProperties(String destinationName, String body, Map properties) { + TextMessage message = createMessage(body, properties); + pushMessage(destinationName, message); + return message; + } + + public MapMessage pushMessageWithProperties(String destinationName, Map body, Map properties) { + MapMessage message = createMessage(body, properties); + pushMessage(destinationName, message); + return message; + } + + public ObjectMessage pushMessageWithProperties(String destinationName, Serializable body, Map properties) { + ObjectMessage message = createMessage(body, properties); + pushMessage(destinationName, message); + return message; + } + + + public Message peekMessage(String destinationName) { + if (null == brokerService) { + throw new NullPointerException("peekMessage failure - BrokerService is null"); + } + + if (destinationName == null) { + throw new IllegalArgumentException("peekMessage failure - destination name is required"); + } + + ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); + Destination brokerDestination = null; + + try { + brokerDestination = brokerService.getDestination(destination); + } catch (Exception ex) { + throw new EmbeddedActiveMQBrokerException("peekMessage failure - unexpected exception getting destination from BrokerService", ex); + } + + if (brokerDestination == null) { + throw new IllegalStateException(String.format("peekMessage failure - destination %s not found in broker %s", destination.toString(), brokerService.getBrokerName())); + } + + org.apache.activemq.command.Message[] messages = brokerDestination.browse(); + if (messages != null && messages.length > 0) { + return (Message) messages[0]; + } + + return null; + } + + public BytesMessage peekBytesMessage(String destinationName) { + return (BytesMessage) peekMessage(destinationName); + } + + public TextMessage peekTextMessage(String destinationName) { + return (TextMessage) peekMessage(destinationName); + } + + public MapMessage peekMapMessage(String destinationName) { + return (MapMessage) peekMessage(destinationName); + } + + public ObjectMessage peekObjectMessage(String destinationName) { + return (ObjectMessage) peekMessage(destinationName); + } + + public StreamMessage peekStreamMessage(String destinationName) { + return (StreamMessage) peekMessage(destinationName); + } + + public static class EmbeddedActiveMQBrokerException extends RuntimeException { + public EmbeddedActiveMQBrokerException(String message) { + super(message); + } + + public EmbeddedActiveMQBrokerException(String message, Exception cause) { + super(message, cause); + } + } + + private class InternalClient { + ActiveMQConnectionFactory connectionFactory; + Connection connection; + Session session; + MessageProducer producer; + + public InternalClient() { + } + + void start() { + connectionFactory = createConnectionFactory(); + try { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(null); + connection.start(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Internal Client creation failure", jmsEx); + } + } + + void stop() { + if (null != connection) { + try { + connection.close(); + } catch (JMSException jmsEx) { + log.warn("JMSException encounter closing InternalClient connection - ignoring", jmsEx); + } + } + } + + public BytesMessage createBytesMessage() { + checkSession(); + + try { + return session.createBytesMessage(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Failed to create BytesMessage", jmsEx); + } + } + + public TextMessage createTextMessage() { + checkSession(); + + try { + return session.createTextMessage(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Failed to create TextMessage", jmsEx); + } + } + + public MapMessage createMapMessage() { + checkSession(); + + try { + return session.createMapMessage(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Failed to create MapMessage", jmsEx); + } + } + + public ObjectMessage createObjectMessage() { + checkSession(); + + try { + return session.createObjectMessage(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Failed to create ObjectMessage", jmsEx); + } + } + + public StreamMessage createStreamMessage() { + checkSession(); + try { + return session.createStreamMessage(); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException("Failed to create StreamMessage", jmsEx); + } + } + + public void pushMessage(ActiveMQDestination destination, Message message) { + if (producer == null) { + throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?"); + } + + try { + producer.send(destination, message); + } catch (JMSException jmsEx) { + throw new EmbeddedActiveMQBrokerException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx); + } + } + + void checkSession() { + if (session == null) { + throw new IllegalStateException("JMS Session is null - has the InternalClient been started?"); + } + } + } }