git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1187001 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-20 19:21:55 +00:00
parent 8ce077800f
commit 4b202c1d7a
3 changed files with 143 additions and 78 deletions

View File

@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Reader;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@ -29,16 +30,16 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* Adds the extra methods available to text based wire format implementations
*
*
*
*
*/
public abstract class TextWireFormat implements WireFormat {
public abstract Object unmarshalText(String text);
public abstract Object unmarshalText(String text) throws IOException;
public abstract Object unmarshalText(Reader reader);
public abstract Object unmarshalText(Reader reader) throws IOException;
public abstract String marshalText(Object command);
public abstract String marshalText(Object command) throws IOException;
public void marshal(Object command, DataOutput out) throws IOException {
String text = marshalText(command);

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.transport.xstream;
import java.io.IOException;
import java.io.Reader;
import com.thoughtworks.xstream.XStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.wireformat.WireFormat;
@ -28,8 +31,8 @@ import org.apache.activemq.wireformat.WireFormat;
* A {@link WireFormat} implementation which uses the <a
* href="http://xstream.codehaus.org/>XStream</a> library to marshall commands
* onto the wire
*
*
*
*
*/
public class XStreamWireFormat extends TextWireFormat {
private XStream xStream;
@ -55,13 +58,22 @@ public class XStreamWireFormat extends TextWireFormat {
return (Command)getXStream().fromXML(reader);
}
public String marshalText(Object command) {
public String marshalText(Object command) throws IOException {
if (command instanceof MarshallAware) {
((MarshallAware)command).beforeMarshall(this);
} else if(command instanceof MessageDispatch) {
MessageDispatch dispatch = (MessageDispatch) command;
if (dispatch != null && dispatch.getMessage() != null) {
dispatch.getMessage().beforeMarshall(this);
}
}
return getXStream().toXML(command);
}
/**
* Can this wireformat process packets of this version
*
*
* @param version the version number to test
* @return true if can accept the version
*/

View File

@ -63,76 +63,19 @@ public class HttpSendCompressedMessagesTest {
private static final String destinationName = "HttpCompressionTopic";
private void sendTextMessage(boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(builder.toString()));
}
private void sendBytesMessage(boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
BytesMessage message = session.createBytesMessage();
message.writeUTF(builder.toString());
producer.send(message);
}
private void sendStreamMessage(boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
StreamMessage message = session.createStreamMessage();
message.writeString(builder.toString());
producer.send(message);
}
private void sendMapMessage(boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
MapMessage message = session.createMapMessage();
message.setString("content", builder.toString());
producer.send(message);
@Test
public void testTextMessageCompressionFromTcp() throws Exception {
sendTextMessage(true);
doTestTextMessageCompression();
}
@Test
public void testTextMessageCompression() throws Exception {
sendTextMessage(true);
public void testTextMessageCompressionFromHttp() throws Exception {
sendTextMessage(httpConnectionFactory, true);
doTestTextMessageCompression();
}
private void doTestTextMessageCompression() throws Exception {
ActiveMQTextMessage tcpMessage = (ActiveMQTextMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
ActiveMQTextMessage httpMessage = (ActiveMQTextMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
@ -168,9 +111,18 @@ public class HttpSendCompressedMessagesTest {
}
@Test
public void testBytesMessageCompression() throws Exception {
public void testBytesMessageCompressionFromTcp() throws Exception {
sendBytesMessage(true);
doTestBytesMessageCompression();
}
@Test
public void testBytesMessageCompressionFromHttp() throws Exception {
sendBytesMessage(httpConnectionFactory, true);
doTestBytesMessageCompression();
}
private void doTestBytesMessageCompression() throws Exception {
ActiveMQBytesMessage tcpMessage = (ActiveMQBytesMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
ActiveMQBytesMessage httpMessage = (ActiveMQBytesMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
@ -206,9 +158,18 @@ public class HttpSendCompressedMessagesTest {
}
@Test
public void testStreamMessageCompression() throws Exception {
public void testStreamMessageCompressionFromTcp() throws Exception {
sendStreamMessage(true);
doTestStreamMessageCompression();
}
@Test
public void testStreamMessageCompressionFromHttp() throws Exception {
sendStreamMessage(httpConnectionFactory, true);
doTestStreamMessageCompression();
}
private void doTestStreamMessageCompression() throws Exception {
ActiveMQStreamMessage tcpMessage = (ActiveMQStreamMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
ActiveMQStreamMessage httpMessage = (ActiveMQStreamMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
@ -244,9 +205,18 @@ public class HttpSendCompressedMessagesTest {
}
@Test
public void testMapMessageCompression() throws Exception {
public void testMapMessageCompressionFromTcp() throws Exception {
sendMapMessage(true);
doTestMapMessageCompression();
}
@Test
public void testMapMessageCompressionFromHttp() throws Exception {
sendMapMessage(httpConnectionFactory, true);
doTestMapMessageCompression();
}
private void doTestMapMessageCompression() throws Exception {
ActiveMQMapMessage tcpMessage = (ActiveMQMapMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
ActiveMQMapMessage httpMessage = (ActiveMQMapMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
@ -317,4 +287,86 @@ public class HttpSendCompressedMessagesTest {
broker.waitUntilStopped();
}
}
private void sendTextMessage(boolean compressed) throws Exception {
sendTextMessage(tcpConnectionFactory, compressed);
}
private void sendTextMessage(ActiveMQConnectionFactory factory, boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(builder.toString()));
}
private void sendBytesMessage(boolean compressed) throws Exception {
sendBytesMessage(tcpConnectionFactory, compressed);
}
private void sendBytesMessage(ActiveMQConnectionFactory factory, boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
BytesMessage message = session.createBytesMessage();
message.writeUTF(builder.toString());
producer.send(message);
}
private void sendStreamMessage(boolean compressed) throws Exception {
sendStreamMessage(tcpConnectionFactory, compressed);
}
private void sendStreamMessage(ActiveMQConnectionFactory factory, boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
StreamMessage message = session.createStreamMessage();
message.writeString(builder.toString());
producer.send(message);
}
private void sendMapMessage(boolean compressed) throws Exception {
sendMapMessage(tcpConnectionFactory, compressed);
}
private void sendMapMessage(ActiveMQConnectionFactory factory, boolean compressed) throws Exception {
StringBuilder builder = new StringBuilder();
for(int i = 0; i < 10; ++i) {
builder.append(UUID.randomUUID().toString());
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setUseCompression(compressed);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(destinationName);
MessageProducer producer = session.createProducer(destination);
MapMessage message = session.createMapMessage();
message.setString("content", builder.toString());
producer.send(message);
}
}