Adds support for setting useCompression on NetworkConnectors to enforce that all messages forwarded across the network are compressed.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1353042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-22 21:53:01 +00:00
parent 60624c4e38
commit 41c93667dc
13 changed files with 606 additions and 111 deletions

View File

@ -118,7 +118,8 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
storeContent();
}
private void storeContent() {
@Override
public void storeContent() {
try {
if (dataOut != null) {
dataOut.close();
@ -853,4 +854,22 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
public String toString() {
return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
}
@Override
protected void doCompress() throws IOException {
compressed = true;
ByteSequence bytes = getContent();
int length = bytes.getLength();
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
bytesOut.write(new byte[4]);
DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
DataOutputStream dataOut = new DataOutputStream(os);
dataOut.write(bytes.data, bytes.offset, bytes.length);
dataOut.flush();
dataOut.close();
bytes = bytesOut.toByteSequence();
ByteSequenceData.writeIntBig(bytes, length);
bytes.offset = 0;
setContent(bytes);
}
}

View File

@ -131,7 +131,8 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
map.clear();
}
private void storeContent() {
@Override
public void storeContent() {
try {
if (getContent() == null && !map.isEmpty()) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
@ -741,6 +742,12 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
setContent(null);
}
@Override
public void compress() throws IOException {
storeContent();
super.compress();
}
public String toString() {
return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
}

View File

@ -20,15 +20,16 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.scheduler.CronParser;
@ -53,7 +54,6 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
return DATA_STRUCTURE_TYPE;
}
@Override
public Message copy() {
ActiveMQMessage copy = new ActiveMQMessage();
@ -280,6 +280,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
}
@SuppressWarnings("rawtypes")
public Enumeration getPropertyNames() throws JMSException {
try {
Vector<String> result = new Vector<String>(this.getProperties().keySet());
@ -294,6 +295,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
* @return Enumeration of all property names on this message
* @throws JMSException
*/
@SuppressWarnings("rawtypes")
public Enumeration getAllPropertyNames() throws JMSException {
try {
Vector<String> result = new Vector<String>(this.getProperties().keySet());
@ -305,7 +307,6 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
interface PropertySetter {
void set(Message message, Object value) throws MessageFormatException;
}
@ -445,10 +446,8 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
}
public void setProperties(Map properties) throws JMSException {
for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
public void setProperties(Map<String, ?> properties) throws JMSException {
for (Map.Entry<String, ?> entry : properties.entrySet()) {
// Lets use the object property method as we may contain standard
// extension headers like JMSXGroupID
setObjectProperty((String) entry.getKey(), entry.getValue());
@ -680,4 +679,8 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessage(this);
}
@Override
public void storeContent() {
}
}

View File

@ -89,6 +89,7 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
}
@Override
public void storeContent() {
ByteSequence bodyAsBytes = getContent();
if (bodyAsBytes == null && object != null) {
@ -214,6 +215,12 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
object = null;
}
@Override
public void compress() throws IOException {
storeContent();
super.compress();
}
public String toString() {
try {
getObject();

View File

@ -137,7 +137,8 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
storeContent();
}
private void storeContent() {
@Override
public void storeContent() {
if (dataOut != null) {
try {
dataOut.close();
@ -1146,6 +1147,12 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
}
}
@Override
public void compress() throws IOException {
storeContent();
super.compress();
}
public String toString() {
return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
}

View File

@ -105,7 +105,12 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public void beforeMarshall(WireFormat wireFormat) throws IOException {
super.beforeMarshall(wireFormat);
storeContent();
}
@Override
public void storeContent() {
try {
ByteSequence content = getContent();
if (content == null && text != null) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
@ -120,6 +125,9 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
dataOut.close();
setContent(bytesOut.toByteSequence());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// see https://issues.apache.org/activemq/browse/AMQ-2103

View File

@ -19,10 +19,14 @@ package org.apache.activemq.command;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.DeflaterOutputStream;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.Destination;
@ -94,6 +98,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public abstract Message copy();
public abstract void clearBody() throws JMSException;
public abstract void storeContent();
// useful to reduce the memory footprint of a persisted message
public void clearMarshalledState() throws JMSException {
@ -743,6 +748,25 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
return false;
}
public void compress() throws IOException {
if (!isCompressed()) {
storeContent();
if (!isCompressed() && getContent() != null) {
doCompress();
}
}
}
protected void doCompress() throws IOException {
compressed = true;
ByteSequence bytes = getContent();
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
OutputStream os = new DeflaterOutputStream(bytesOut);
os.write(bytes.data, bytes.offset, bytes.length);
os.close();
setContent(bytesOut.toByteSequence());
}
@Override
public String toString() {
return toString(null);

View File

@ -666,7 +666,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected Message configureMessage(MessageDispatch md) {
protected Message configureMessage(MessageDispatch md) throws IOException {
Message message = md.getMessage().copy();
// Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
@ -676,6 +676,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
message.setOriginalTransactionId(message.getTransactionId());
}
message.setTransactionId(null);
if (configuration.isUseCompression()) {
message.compress();
}
return message;
}

View File

@ -77,6 +77,7 @@ public class ForwardingBridge implements Service {
private boolean dispatchAsync;
private String destinationFilter = ">";
private NetworkBridgeListener bridgeFailedListener;
private boolean useCompression = false;
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@ -232,11 +233,13 @@ public class ForwardingBridge implements Service {
}
message.setTransactionId(null);
if (isUseCompression()) {
message.compress();
}
if (!message.isResponseRequired()) {
// If the message was originally sent using async send, we
// will preserve that QOS
// by bridging it using an async send (small chance of
// message loss).
// If the message was originally sent using async send, we will preserve that
// QOS by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
dequeueCounter.incrementAndGet();
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
@ -382,4 +385,19 @@ public class ForwardingBridge implements Service {
return enqueueCounter.get();
}
/**
* @param useCompression
* True if forwarded Messages should have their bodies compressed.
*/
public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}
/**
* @return the vale of the useCompression setting, true if forwarded messages will be compressed.
*/
public boolean isUseCompression() {
return useCompression;
}
}

View File

@ -16,12 +16,12 @@
*/
package org.apache.activemq.network;
import java.util.List;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import java.util.List;
/**
* Configuration for a NetworkBridge
*
@ -53,6 +53,7 @@ public class NetworkBridgeConfiguration {
private boolean alwaysSyncSend = false;
private boolean staticBridge = false;
private boolean useCompression = false;
/**
* @return the conduitSubscriptions
@ -368,4 +369,19 @@ public class NetworkBridgeConfiguration {
public void setStaticBridge(boolean staticBridge) {
this.staticBridge = staticBridge;
}
/**
* @param useCompression
* True if the Network should enforce compression for messages sent.
*/
public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}
/**
* @return the useCompression setting, true if message will be compressed on send.
*/
public boolean isUseCompression() {
return useCompression;
}
}

View File

@ -87,7 +87,6 @@ public class ActiveMQMessageTest extends TestCase {
for (int i = 0; i < this.consumerIDs.length; i++) {
this.consumerIDs[i] = i;
}
}
/*
@ -126,7 +125,6 @@ public class ActiveMQMessageTest extends TestCase {
public void testSetToForeignJMSID() throws Exception {
ActiveMQMessage msg = new ActiveMQMessage();
msg.setJMSMessageID("ID:EMS-SERVER.8B443C380083:429");
}
/*
@ -356,6 +354,7 @@ public class ActiveMQMessageTest extends TestCase {
assertTrue(((Float)msg.getObjectProperty(name)).floatValue() == 1.3f);
}
@SuppressWarnings("rawtypes")
public void testGetPropertyNames() throws JMSException {
ActiveMQMessage msg = new ActiveMQMessage();
String name1 = "floatProperty";
@ -379,6 +378,7 @@ public class ActiveMQMessageTest extends TestCase {
assertFalse("prop name4 not found", found3);
}
@SuppressWarnings("rawtypes")
public void testGetAllPropertyNames() throws JMSException {
ActiveMQMessage msg = new ActiveMQMessage();
String name1 = "floatProperty";
@ -451,6 +451,10 @@ public class ActiveMQMessageTest extends TestCase {
@Override
public void clearBody() throws JMSException {
}
@Override
public void storeContent() {
}
};
msg.setProperty("stringProperty", "string");
@ -465,7 +469,7 @@ public class ActiveMQMessageTest extends TestCase {
msg.beforeMarshall(new OpenWireFormat());
Map properties = msg.getProperties();
Map<String, Object> properties = msg.getProperties();
assertEquals(properties.get("stringProperty"), "string");
assertEquals(((Byte)properties.get("byteProperty")).byteValue(), 1);
assertEquals(((Short)properties.get("shortProperty")).shortValue(), 1);
@ -475,7 +479,6 @@ public class ActiveMQMessageTest extends TestCase {
assertEquals(((Double)properties.get("doubleProperty")).doubleValue(), 1.1, 0);
assertEquals(((Boolean)properties.get("booleanProperty")).booleanValue(), true);
assertNull(properties.get("nullProperty"));
}
public void testSetNullProperty() throws JMSException {
@ -942,5 +945,4 @@ public class ActiveMQMessageTest extends TestCase {
msg.setJMSExpiration(System.currentTimeMillis() + 10000);
assertFalse(msg.isExpired());
}
}

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
public class CompressionOverNetworkTest {
protected static final int MESSAGE_COUNT = 10;
private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
protected AbstractApplicationContext context;
protected Connection localConnection;
protected Connection remoteConnection;
protected BrokerService localBroker;
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
protected ActiveMQTopic included;
@Test
public void testCompressedOverCompressedNetwork() throws Exception {
ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
localAmqConnection.setUseCompression(true);
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getText());
}
@Test
public void testTextMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createTextMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
assertNotNull(msg);
ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getText());
}
@Test
public void testBytesMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
byte[] bytes = payload.toString().getBytes("UTF-8");
BytesMessage test = localSession.createBytesMessage();
test.writeBytes(bytes);
producer.send(test);
Message msg = consumer1.receive(1000*6000);
assertNotNull(msg);
ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg;
assertTrue(message.isCompressed());
assertTrue(message.getContent().getLength() < bytes.length);
byte[] result = new byte[bytes.length];
assertEquals(bytes.length, message.readBytes(result));
assertEquals(-1, message.readBytes(result));
for(int i = 0; i < bytes.length; ++i) {
assertEquals(bytes[i], result[i]);
}
}
@Test
public void testStreamMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StreamMessage test = localSession.createStreamMessage();
for (int i = 0; i < 100; ++i) {
test.writeString("test string: " + i);
}
producer.send(test);
Message msg = consumer1.receive(1000);
assertNotNull(msg);
ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg;
assertTrue(message.isCompressed());
for (int i = 0; i < 100; ++i) {
assertEquals("test string: " + i, message.readString());
}
}
@Test
public void testMapMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
MapMessage test = localSession.createMapMessage();
for (int i = 0; i < 100; ++i) {
test.setString(Integer.toString(i), "test string: " + i);
}
producer.send(test);
Message msg = consumer1.receive(1000);
assertNotNull(msg);
ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
assertTrue(message.isCompressed());
for (int i = 0; i < 100; ++i) {
assertEquals("test string: " + i, message.getString(Integer.toString(i)));
}
}
@Test
public void testObjectMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
StringBuilder payload = new StringBuilder("test-");
for (int i = 0; i < 100; ++i) {
payload.append(UUID.randomUUID().toString());
}
Message test = localSession.createObjectMessage(payload.toString());
producer.send(test);
Message msg = consumer1.receive(1000);
assertNotNull(msg);
ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
assertTrue(message.isCompressed());
assertEquals(payload.toString(), message.getObject());
}
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
if (bridges.length > 0) {
LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
if (!forwardingBridges.isEmpty()) {
for (DemandSubscription demandSubscription : forwardingBridges.values()) {
if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
return demandSubscription.size() >= min;
}
}
}
}
return false;
}
}));
}
@Before
public void setUp() throws Exception {
doSetUp(true);
}
@After
public void tearDown() throws Exception {
doTearDown();
}
protected void doTearDown() throws Exception {
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
}
protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
included = new ActiveMQTopic("include.test.bar");
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
}
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
}
protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
for (NetworkConnector connector : result.getNetworkConnectors()) {
connector.setUseCompression(true);
}
return result;
}
protected BrokerService createLocalBroker() throws Exception {
return createBroker(getLocalBrokerURI());
}
protected BrokerService createRemoteBroker() throws Exception {
return createBroker(getRemoteBrokerURI());
}
}

View File

@ -19,8 +19,10 @@ package org.apache.activemq.network;
import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -34,6 +36,56 @@ public class ForwardingBridgeTest extends NetworkTestSupport {
public int deliveryMode;
private ForwardingBridge bridge;
public void initCombosForTestForwardMessageCompressed() {
addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE),
new Byte(ActiveMQDestination.TOPIC_TYPE)});
}
public void testForwardMessageCompressed() throws Exception {
bridge.setUseCompression(true);
// Start a producer on local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
// Start a consumer on a remote broker
StubConnection connection2 = createRemoteConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo);
Thread.sleep(1000);
// Give forwarding bridge a chance to finish setting up
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
// Send the message to the local boker.
connection1.send(createMessage(producerInfo, destination, deliveryMode));
// Make sure the message was delivered via the remote.
Message m = receiveMessage(connection2);
assertNotNull(m);
// Make sure its compressed now
ActiveMQMessage message = (ActiveMQMessage) m;
assertTrue(message.isCompressed());
}
public void initCombosForTestAddConsumerThenSend() {
addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)});