AMQ-6428 - Added convience methods to EmbeddedActiveMQBroker and JUnit Resources for ActiveMQ clients

This commit is contained in:
Quinn Stevenson 2016-10-03 12:31:31 -06:00
parent 3239e4f79e
commit bab4a92d6e
11 changed files with 1502 additions and 60 deletions

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractActiveMQClientResource extends ExternalResource {
Logger log = LoggerFactory.getLogger(this.getClass());
ActiveMQConnectionFactory connectionFactory;
Connection connection;
Session session;
ActiveMQDestination destination;
public AbstractActiveMQClientResource(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public AbstractActiveMQClientResource(URI brokerURI) {
this(new ActiveMQConnectionFactory(brokerURI));
}
public AbstractActiveMQClientResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
this(embeddedActiveMQBroker.createConnectionFactory());
}
public AbstractActiveMQClientResource(URI brokerURI, String userName, String password) {
this(new ActiveMQConnectionFactory(userName, password, brokerURI));
}
public AbstractActiveMQClientResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
this(connectionFactory);
destination = createDestination(destinationName);
}
public AbstractActiveMQClientResource(String destinationName, URI brokerURI) {
this(destinationName, new ActiveMQConnectionFactory(brokerURI));
}
public AbstractActiveMQClientResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
this(destinationName, embeddedActiveMQBroker.createConnectionFactory());
}
public AbstractActiveMQClientResource(String destinationName, URI brokerURI, String userName, String password) {
this(destinationName, new ActiveMQConnectionFactory(userName, password, brokerURI));
}
public static void setMessageProperties(Message message, Map<String, Object> properties) throws JMSException {
if (properties != null) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
message.setObjectProperty(property.getKey(), property.getValue());
}
}
}
public String getClientId() {
return null;
}
public String getDestinationName() {
return (destination != null) ? destination.toString() : null;
}
public abstract byte getDestinationType();
protected abstract void createClient() throws JMSException;
/**
* Start the Client
* <p/>
* Invoked by JUnit to setup the resource
*/
@Override
protected void before() throws Throwable {
log.info("Starting {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL());
this.start();
super.before();
}
/**
* Stop the Client
* <p/>
* Invoked by JUnit to tear down the resource
*/
@Override
protected void after() {
log.info("Stopping {}: {}", this.getClass().getSimpleName(), connectionFactory.getBrokerURL());
super.after();
this.stop();
}
public void start() {
try {
try {
connection = connectionFactory.createConnection();
String clientId = getClientId();
if (clientId != null) {
connection.setClientID(clientId);
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createClient();
} catch (JMSException jmsEx) {
throw new RuntimeException("Producer initialization failed" + this.getClass().getSimpleName(), jmsEx);
}
connection.start();
} catch (JMSException jmsEx) {
throw new IllegalStateException("Producer failed to start", jmsEx);
}
log.info("Ready to produce messages to {}", connectionFactory.getBrokerURL());
}
public void stop() {
try {
connection.close();
} catch (JMSException jmsEx) {
log.warn("Exception encountered closing JMS Connection", jmsEx);
}
}
public String getBrokerURL() {
return connectionFactory.getBrokerURL();
}
protected ActiveMQDestination createDestination(String destinationName) {
if (destinationName != null) {
return ActiveMQDestination.createDestination(destinationName, getDestinationType());
}
return null;
}
public BytesMessage createBytesMessage() throws JMSException {
return session.createBytesMessage();
}
public TextMessage createTextMessage() throws JMSException {
return session.createTextMessage();
}
public MapMessage createMapMessage() throws JMSException {
return session.createMapMessage();
}
public ObjectMessage createObjectMessage() throws JMSException {
return session.createObjectMessage();
}
public StreamMessage createStreamMessage() throws JMSException {
return session.createStreamMessage();
}
public BytesMessage createMessage(byte[] body) throws JMSException {
return this.createMessage(body, null);
}
public TextMessage createMessage(String body) throws JMSException {
return this.createMessage(body, null);
}
public MapMessage createMessage(Map<String, Object> body) throws JMSException {
return this.createMessage(body, null);
}
public ObjectMessage createMessage(Serializable body) throws JMSException {
return this.createMessage(body, null);
}
public BytesMessage createMessage(byte[] body, Map<String, Object> properties) throws JMSException {
BytesMessage message = this.createBytesMessage();
if (body != null) {
message.writeBytes(body);
}
setMessageProperties(message, properties);
return message;
}
public TextMessage createMessage(String body, Map<String, Object> properties) throws JMSException {
TextMessage message = this.createTextMessage();
if (body != null) {
message.setText(body);
}
setMessageProperties(message, properties);
return message;
}
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) throws JMSException {
MapMessage message = this.createMapMessage();
if (body != null) {
for (Map.Entry<String, Object> entry : body.entrySet()) {
message.setObject(entry.getKey(), entry.getValue());
}
}
setMessageProperties(message, properties);
return message;
}
public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) throws JMSException {
ObjectMessage message = this.createObjectMessage();
if (body != null) {
message.setObject(body);
}
setMessageProperties(message, properties);
return message;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public abstract class AbstractActiveMQConsumerResource extends AbstractActiveMQClientResource {
MessageConsumer consumer;
long defaultReceiveTimout = 50;
public AbstractActiveMQConsumerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public AbstractActiveMQConsumerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public AbstractActiveMQConsumerResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
public long getDefaultReceiveTimout() {
return defaultReceiveTimout;
}
public void setDefaultReceiveTimout(long defaultReceiveTimout) {
this.defaultReceiveTimout = defaultReceiveTimout;
}
@Override
protected void createClient() throws JMSException {
consumer = session.createConsumer(destination);
}
public BytesMessage receiveBytesMessage() throws JMSException {
return (BytesMessage) this.receiveMessage();
}
public TextMessage receiveTextMessage() throws JMSException {
return (TextMessage) this.receiveMessage();
}
public MapMessage receiveMapMessage() throws JMSException {
return (MapMessage) this.receiveMessage();
}
public ObjectMessage receiveObjectMessage() throws JMSException {
return (ObjectMessage) this.receiveMessage();
}
public BytesMessage receiveBytesMessage(long timeout) throws JMSException {
return (BytesMessage) this.receiveMessage(timeout);
}
public TextMessage receiveTextMessage(long timeout) throws JMSException {
return (TextMessage) this.receiveMessage(timeout);
}
public MapMessage receiveMapMessage(long timeout) throws JMSException {
return (MapMessage) this.receiveMessage(timeout);
}
public ObjectMessage receiveObjectMessage(long timeout) throws JMSException {
return (ObjectMessage) this.receiveMessage(timeout);
}
public Message receiveMessage() throws JMSException {
return receiveMessage(defaultReceiveTimout);
}
/**
* Receive a message with the given timeout
*
* @param timeout
* @return
* @throws JMSException
*/
public Message receiveMessage(long timeout) throws JMSException {
Message message = null;
if (timeout > 0) {
message = consumer.receive(timeout);
} else if (timeout == 0) {
message = consumer.receiveNoWait();
} else {
message = consumer.receive();
}
return message;
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public abstract class AbstractActiveMQProducerResource extends AbstractActiveMQClientResource {
MessageProducer producer;
public AbstractActiveMQProducerResource(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public AbstractActiveMQProducerResource(URI brokerURI) {
super(brokerURI);
}
public AbstractActiveMQProducerResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(embeddedActiveMQBroker);
}
public AbstractActiveMQProducerResource(URI brokerURI, String userName, String password) {
super(brokerURI, userName, password);
}
public AbstractActiveMQProducerResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public AbstractActiveMQProducerResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public AbstractActiveMQProducerResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public AbstractActiveMQProducerResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public String getDestinationName() {
try {
if (producer != null && producer.getDestination() != null) {
return producer.getDestination().toString();
}
} catch (JMSException e) {
// eat this
}
return null;
}
public void sendMessage(Message message) throws JMSException {
producer.send(message);
}
public BytesMessage sendMessage(byte[] body) throws JMSException {
BytesMessage message = this.createMessage(body);
sendMessage(message);
return message;
}
public TextMessage sendMessage(String body) throws JMSException {
TextMessage message = this.createMessage(body);
sendMessage(message);
return message;
}
public MapMessage sendMessage(Map<String, Object> body) throws JMSException {
MapMessage message = this.createMessage(body);
sendMessage(message);
return message;
}
public ObjectMessage sendMessage(Serializable body) throws JMSException {
ObjectMessage message = this.createMessage(body);
sendMessage(message);
return message;
}
public BytesMessage sendMessageWithProperties(byte[] body, Map<String, Object> properties) throws JMSException {
BytesMessage message = this.createMessage(body, properties);
sendMessage(message);
return message;
}
public TextMessage sendMessageWithProperties(String body, Map<String, Object> properties) throws JMSException {
TextMessage message = this.createMessage(body, properties);
sendMessage(message);
return message;
}
public MapMessage sendMessageWithProperties(Map<String, Object> body, Map<String, Object> properties) throws JMSException {
MapMessage message = this.createMessage(body, properties);
sendMessage(message);
return message;
}
public ObjectMessage sendMessageWithProperties(Serializable body, Map<String, Object> properties) throws JMSException {
ObjectMessage message = this.createMessage(body, properties);
sendMessage(message);
return message;
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQDynamicQueueSenderResource extends AbstractActiveMQProducerResource {
public ActiveMQDynamicQueueSenderResource(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public ActiveMQDynamicQueueSenderResource(URI brokerURI) {
super(brokerURI);
}
public ActiveMQDynamicQueueSenderResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(embeddedActiveMQBroker);
}
public ActiveMQDynamicQueueSenderResource(URI brokerURI, String userName, String password) {
super(brokerURI, userName, password);
}
public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) {
super(defaultDestinationName, connectionFactory);
}
public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI) {
super(defaultDestinationName, brokerURI);
}
public ActiveMQDynamicQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQDynamicQueueSenderResource(String defaultDestinationName, URI brokerURI, String userName, String password) {
super(defaultDestinationName, brokerURI, userName, password);
}
@Override
protected void createClient() throws JMSException {
producer = session.createProducer(null);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.QUEUE_TYPE;
}
@Override
public void sendMessage(Message message) throws JMSException {
if (destination == null) {
throw new IllegalStateException("Destination is not specified");
}
producer.send(destination, message);
}
public void sendMessage(String destinationName, Message message) throws JMSException {
producer.send(createDestination(destinationName), message);
}
public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException {
BytesMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public TextMessage sendMessage(String destinationName, String body) throws JMSException {
TextMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public MapMessage sendMessage(String destinationName, Map<String, Object> body) throws JMSException {
MapMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException {
ObjectMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) throws JMSException {
BytesMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public TextMessage sendMessageWithProperties(String destinationName, String body, Map<String, Object> properties) throws JMSException {
TextMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public MapMessage sendMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) throws JMSException {
MapMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) throws JMSException {
ObjectMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQDynamicTopicPublisherResource extends AbstractActiveMQProducerResource {
public ActiveMQDynamicTopicPublisherResource(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public ActiveMQDynamicTopicPublisherResource(URI brokerURI) {
super(brokerURI);
}
public ActiveMQDynamicTopicPublisherResource(EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(embeddedActiveMQBroker);
}
public ActiveMQDynamicTopicPublisherResource(URI brokerURI, String userName, String password) {
super(brokerURI, userName, password);
}
public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, ActiveMQConnectionFactory connectionFactory) {
super(defaultDestinationName, connectionFactory);
}
public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI) {
super(defaultDestinationName, brokerURI);
}
public ActiveMQDynamicTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQDynamicTopicPublisherResource(String defaultDestinationName, URI brokerURI, String userName, String password) {
super(defaultDestinationName, brokerURI, userName, password);
}
@Override
protected void createClient() throws JMSException {
producer = session.createProducer(null);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.TOPIC_TYPE;
}
@Override
public void sendMessage(Message message) throws JMSException {
if (destination == null) {
throw new IllegalStateException("Destination is not specified");
}
producer.send(destination, message);
}
public void sendMessage(String destinationName, Message message) throws JMSException {
producer.send(createDestination(destinationName), message);
}
public BytesMessage sendMessage(String destinationName, byte[] body) throws JMSException {
BytesMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public TextMessage sendMessage(String destinationName, String body) throws JMSException {
TextMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public MapMessage sendMessage(String destinationName, Map<String, Object> body) throws JMSException {
MapMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public ObjectMessage sendMessage(String destinationName, Serializable body) throws JMSException {
ObjectMessage message = this.createMessage(body);
sendMessage(destinationName, message);
return message;
}
public BytesMessage sendMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) throws JMSException {
BytesMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public TextMessage sendMessageWithProperties(String destinationName, String body, Map<String, Object> properties) throws JMSException {
TextMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public MapMessage sendMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) throws JMSException {
MapMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
public ObjectMessage sendMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) throws JMSException {
ObjectMessage message = this.createMessage(body, properties);
sendMessage(destinationName, message);
return message;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQQueueReceiverResource extends AbstractActiveMQConsumerResource {
public ActiveMQQueueReceiverResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public ActiveMQQueueReceiverResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQQueueReceiverResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.QUEUE_TYPE;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQQueueSenderResource extends AbstractActiveMQProducerResource {
public ActiveMQQueueSenderResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public ActiveMQQueueSenderResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public ActiveMQQueueSenderResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQQueueSenderResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.QUEUE_TYPE;
}
@Override
protected void createClient() throws JMSException {
producer = session.createProducer(destination);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQTopicDurableSubscriberResource extends AbstractActiveMQConsumerResource {
String clientId = "test-client-id";
String subscriberName = "test-subscriber";
public ActiveMQTopicDurableSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public ActiveMQTopicDurableSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQTopicDurableSubscriberResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.TOPIC_TYPE;
}
@Override
protected void createClient() throws JMSException {
consumer = session.createDurableSubscriber((Topic) destination, subscriberName);
}
@Override
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getSubscriberName() {
return subscriberName;
}
public void setSubscriberName(String subscriberName) {
this.subscriberName = subscriberName;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQTopicPublisherResource extends AbstractActiveMQProducerResource {
public ActiveMQTopicPublisherResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public ActiveMQTopicPublisherResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQTopicPublisherResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public String getDestinationName() {
try {
if (producer != null && producer.getDestination() != null) {
return producer.getDestination().toString();
}
} catch (JMSException e) {
// eat this
}
return null;
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.TOPIC_TYPE;
}
@Override
protected void createClient() throws JMSException {
producer = session.createProducer(destination);
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.net.URI;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQTopicSubscriberResource extends AbstractActiveMQConsumerResource {
public ActiveMQTopicSubscriberResource(String destinationName, ActiveMQConnectionFactory connectionFactory) {
super(destinationName, connectionFactory);
}
public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI) {
super(destinationName, brokerURI);
}
public ActiveMQTopicSubscriberResource(String destinationName, EmbeddedActiveMQBroker embeddedActiveMQBroker) {
super(destinationName, embeddedActiveMQBroker);
}
public ActiveMQTopicSubscriberResource(String destinationName, URI brokerURI, String userName, String password) {
super(destinationName, brokerURI, userName, password);
}
@Override
public byte getDestinationType() {
return ActiveMQDestination.TOPIC_TYPE;
}
}

View File

@ -16,23 +16,37 @@
*/
package org.apache.activemq.junit;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
/**
* A JUnit Rule that embeds an ActiveMQ broker into a test.
*/
@ -40,15 +54,15 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
Logger log = LoggerFactory.getLogger(this.getClass());
BrokerService brokerService;
InternalClient internalClient;
/**
* Create an embedded ActiveMQ broker using defaults
*
* <p>
* The defaults are:
* - the broker name is 'embedded-broker'
* - JMX is disabled
* - Persistence is disabled
*
* - the broker name is 'embedded-broker'
* - JMX is disabled
* - Persistence is disabled
*/
public EmbeddedActiveMQBroker() {
brokerService = new BrokerService();
@ -61,7 +75,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
/**
* Create an embedded ActiveMQ broker using a configuration URI
*/
public EmbeddedActiveMQBroker(String configurationURI ) {
public EmbeddedActiveMQBroker(String configurationURI) {
try {
brokerService = BrokerFactory.createBroker(configurationURI);
} catch (Exception ex) {
@ -72,7 +86,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
/**
* Create an embedded ActiveMQ broker using a configuration URI
*/
public EmbeddedActiveMQBroker(URI configurationURI ) {
public EmbeddedActiveMQBroker(URI configurationURI) {
try {
brokerService = BrokerFactory.createBroker(configurationURI);
} catch (Exception ex) {
@ -80,13 +94,26 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
}
}
public static void setMessageProperties(Message message, Map<String, Object> properties) {
if (properties != null && properties.size() > 0) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
try {
message.setObjectProperty(property.getKey(), property.getValue());
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx);
}
}
}
}
/**
* Customize the configuration of the embedded ActiveMQ broker
*
* <p>
* This method is called before the embedded ActiveMQ broker is started, and can
* be overridden to this method to customize the broker configuration.
*/
protected void configure() {}
protected void configure() {
}
/**
* Start the embedded ActiveMQ broker, blocking until the broker has successfully started.
@ -98,6 +125,8 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
try {
this.configure();
brokerService.start();
internalClient = new InternalClient();
internalClient.start();
} catch (Exception ex) {
throw new RuntimeException("Exception encountered starting embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
}
@ -112,6 +141,10 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
* be stopped manually to support advanced testing scenarios.
*/
public void stop() {
if (internalClient != null) {
internalClient.stop();
internalClient = null;
}
if (!brokerService.isStopped()) {
try {
brokerService.stop();
@ -158,7 +191,7 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
*/
public ActiveMQConnectionFactory createConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerService.getVmConnectorURI().toString());
connectionFactory.setBrokerURL(getVmURL());
return connectionFactory;
}
@ -187,15 +220,64 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
}
/**
* Get the VM URL for the embedded ActiveMQ Broker
* Get the failover VM URL for the embedded ActiveMQ Broker
* <p/>
* NOTE: The option is precreate=false option is appended to the URL to avoid the automatic creation of brokers
* NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @return the VM URL for the embedded broker
*/
public String getVmURL() {
return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
return getVmURL(true);
}
/**
* Get the VM URL for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URL to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @param failoverURL if true a failover URL will be returned
* @return the VM URL for the embedded broker
*/
public String getVmURL(boolean failoverURL) {
if (failoverURL) {
return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
}
return brokerService.getVmConnectorURI().toString() + "?create=false";
}
/**
* Get the failover VM URI for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @return the VM URI for the embedded broker
*/
public URI getVmURI() {
return getVmURI(true);
}
/**
* Get the VM URI for the embedded ActiveMQ Broker
* <p/>
* NOTE: The create=false option is appended to the URI to avoid the automatic creation of brokers
* and the resulting duplicate broker errors
*
* @param failoverURI if true a failover URI will be returned
* @return the VM URI for the embedded broker
*/
public URI getVmURI(boolean failoverURI) {
URI result;
try {
result = new URI(getVmURL(failoverURI));
} catch (URISyntaxException uriEx) {
throw new RuntimeException("Unable to create failover URI", uriEx);
}
return result;
}
/**
@ -326,64 +408,53 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
/**
* Get the number of messages in a specific JMS Destination.
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue:myQueue
* or topic:myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue:" is assumed.
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* @param fullDestinationName the full name of the JMS Destination
* @param destinationName the full name of the JMS Destination
* @return the number of messages in the JMS Destination
*/
public int getMessageCount(String fullDestinationName) throws Exception {
final int QUEUE_TYPE = 1;
final int TOPIC_TYPE = 2;
public long getMessageCount(String destinationName) {
if (null == brokerService) {
throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
}
int destinationType = QUEUE_TYPE;
String destinationName = fullDestinationName;
if (fullDestinationName.startsWith("queue:")) {
destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1);
} else if (fullDestinationName.startsWith("topic:")) {
destinationType = TOPIC_TYPE;
destinationName = fullDestinationName.substring(fullDestinationName.indexOf(':') + 1);
// TODO: Figure out how to do this for Topics
Destination destination = getDestination(destinationName);
if (destination == null) {
throw new RuntimeException("Failed to find destination: " + destinationName);
}
int messageCount = -1;
boolean foundDestination = false;
for (Destination destination : brokerService.getBroker().getDestinationMap().values()) {
String tmpName = destination.getName();
if (tmpName.equalsIgnoreCase(destinationName)) {
switch (destinationType) {
case QUEUE_TYPE:
if (destination instanceof Queue) {
messageCount = destination.getMessageStore().getMessageCount();
foundDestination = true;
}
break;
case TOPIC_TYPE:
if (destination instanceof Topic) {
messageCount = destination.getMessageStore().getMessageCount();
foundDestination = true;
}
break;
default:
// Should never see this
log.error("Type didn't match: {}", destination.getClass().getName());
}
}
if (foundDestination) {
break;
}
// return destination.getMessageStore().getMessageCount();
return destination.getDestinationStatistics().getMessages().getCount();
}
/**
* Get the ActiveMQ destination
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* @param destinationName the full name of the JMS Destination
* @return the ActiveMQ destination, null if not found
*/
public Destination getDestination(String destinationName) {
if (null == brokerService) {
throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
}
if (!foundDestination) {
log.warn("Didn't find destination {} in broker {}", fullDestinationName, getBrokerName());
Destination destination = null;
try {
destination = brokerService.getDestination(ActiveMQDestination.createDestination(destinationName, QUEUE_TYPE));
} catch (RuntimeException runtimeEx) {
throw runtimeEx;
} catch (Exception ex) {
throw new EmbeddedActiveMQBrokerException("Unexpected exception getting destination from broker", ex);
}
return messageCount;
return destination;
}
private PolicyEntry getDefaultPolicyEntry() {
@ -401,4 +472,323 @@ public class EmbeddedActiveMQBroker extends ExternalResource {
return defaultEntry;
}
public BytesMessage createBytesMessage() {
return internalClient.createBytesMessage();
}
public TextMessage createTextMessage() {
return internalClient.createTextMessage();
}
public MapMessage createMapMessage() {
return internalClient.createMapMessage();
}
public ObjectMessage createObjectMessage() {
return internalClient.createObjectMessage();
}
public StreamMessage createStreamMessage() {
return internalClient.createStreamMessage();
}
public BytesMessage createMessage(byte[] body) {
return this.createMessage(body, null);
}
public TextMessage createMessage(String body) {
return this.createMessage(body, null);
}
public MapMessage createMessage(Map<String, Object> body) {
return this.createMessage(body, null);
}
public ObjectMessage createMessage(Serializable body) {
return this.createMessage(body, null);
}
public BytesMessage createMessage(byte[] body, Map<String, Object> properties) {
BytesMessage message = this.createBytesMessage();
if (body != null) {
try {
message.writeBytes(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public TextMessage createMessage(String body, Map<String, Object> properties) {
TextMessage message = this.createTextMessage();
if (body != null) {
try {
message.setText(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = this.createMapMessage();
if (body != null) {
for (Map.Entry<String, Object> entry : body.entrySet()) {
try {
message.setObject(entry.getKey(), entry.getValue());
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
}
}
}
setMessageProperties(message, properties);
return message;
}
public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) {
ObjectMessage message = this.createObjectMessage();
if (body != null) {
try {
message.setObject(body);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public void pushMessage(String destinationName, Message message) {
if (destinationName == null) {
throw new IllegalArgumentException("pushMessage failure - destination name is required");
} else if (message == null) {
throw new IllegalArgumentException("pushMessage failure - a Message is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
internalClient.pushMessage(destination, message);
}
public BytesMessage pushMessage(String destinationName, byte[] body) {
BytesMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessage(String destinationName, String body) {
TextMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessage(String destinationName, Map<String, Object> body) {
MapMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessage(String destinationName, Serializable body) {
ObjectMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) {
BytesMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessageWithProperties(String destinationName, String body, Map<String, Object> properties) {
TextMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) {
ObjectMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public Message peekMessage(String destinationName) {
if (null == brokerService) {
throw new NullPointerException("peekMessage failure - BrokerService is null");
}
if (destinationName == null) {
throw new IllegalArgumentException("peekMessage failure - destination name is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
Destination brokerDestination = null;
try {
brokerDestination = brokerService.getDestination(destination);
} catch (Exception ex) {
throw new EmbeddedActiveMQBrokerException("peekMessage failure - unexpected exception getting destination from BrokerService", ex);
}
if (brokerDestination == null) {
throw new IllegalStateException(String.format("peekMessage failure - destination %s not found in broker %s", destination.toString(), brokerService.getBrokerName()));
}
org.apache.activemq.command.Message[] messages = brokerDestination.browse();
if (messages != null && messages.length > 0) {
return (Message) messages[0];
}
return null;
}
public BytesMessage peekBytesMessage(String destinationName) {
return (BytesMessage) peekMessage(destinationName);
}
public TextMessage peekTextMessage(String destinationName) {
return (TextMessage) peekMessage(destinationName);
}
public MapMessage peekMapMessage(String destinationName) {
return (MapMessage) peekMessage(destinationName);
}
public ObjectMessage peekObjectMessage(String destinationName) {
return (ObjectMessage) peekMessage(destinationName);
}
public StreamMessage peekStreamMessage(String destinationName) {
return (StreamMessage) peekMessage(destinationName);
}
public static class EmbeddedActiveMQBrokerException extends RuntimeException {
public EmbeddedActiveMQBrokerException(String message) {
super(message);
}
public EmbeddedActiveMQBrokerException(String message, Exception cause) {
super(message, cause);
}
}
private class InternalClient {
ActiveMQConnectionFactory connectionFactory;
Connection connection;
Session session;
MessageProducer producer;
public InternalClient() {
}
void start() {
connectionFactory = createConnectionFactory();
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
connection.start();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Internal Client creation failure", jmsEx);
}
}
void stop() {
if (null != connection) {
try {
connection.close();
} catch (JMSException jmsEx) {
log.warn("JMSException encounter closing InternalClient connection - ignoring", jmsEx);
}
}
}
public BytesMessage createBytesMessage() {
checkSession();
try {
return session.createBytesMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create BytesMessage", jmsEx);
}
}
public TextMessage createTextMessage() {
checkSession();
try {
return session.createTextMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create TextMessage", jmsEx);
}
}
public MapMessage createMapMessage() {
checkSession();
try {
return session.createMapMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create MapMessage", jmsEx);
}
}
public ObjectMessage createObjectMessage() {
checkSession();
try {
return session.createObjectMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create ObjectMessage", jmsEx);
}
}
public StreamMessage createStreamMessage() {
checkSession();
try {
return session.createStreamMessage();
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException("Failed to create StreamMessage", jmsEx);
}
}
public void pushMessage(ActiveMQDestination destination, Message message) {
if (producer == null) {
throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?");
}
try {
producer.send(destination, message);
} catch (JMSException jmsEx) {
throw new EmbeddedActiveMQBrokerException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx);
}
}
void checkSession() {
if (session == null) {
throw new IllegalStateException("JMS Session is null - has the InternalClient been started?");
}
}
}
}