From 5b11ea0e8f685725c28fc41a9af20b442a7d8ea8 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 19 Oct 2011 13:50:38 +0000 Subject: [PATCH] Adds some new tests in the HTTP transport layer. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1186216 13f79535-47bb-0310-9956-ffa450edef68 --- .../http/HttpSendCompressedMessagesTest.java | 320 ++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java new file mode 100644 index 0000000000..9560e6834d --- /dev/null +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.http; + +import static org.junit.Assert.*; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.util.ByteSequence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpSendCompressedMessagesTest { + + private static final Logger LOG = LoggerFactory.getLogger(HttpSendCompressedMessagesTest.class); + + private BrokerService broker; + private static final String tcpBindAddress = "tcp://0.0.0.0:0"; + private static final String httpBindAddress = "http://0.0.0.0:8171"; + private ActiveMQConnectionFactory tcpConnectionFactory; + private ActiveMQConnectionFactory httpConnectionFactory; + private ActiveMQConnection tcpConnection; + private ActiveMQConnection httpConnection; + private Session tcpSession; + private Session httpSession; + private Topic destination; + private MessageConsumer tcpConsumer; + private MessageConsumer httpConsumer; + + private static final String destinationName = "HttpCompressionTopic"; + + private void sendTextMessage(boolean compressed) throws Exception { + + StringBuilder builder = new StringBuilder(); + for(int i = 0; i < 10; ++i) { + builder.append(UUID.randomUUID().toString()); + } + + ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection(); + connection.setUseCompression(compressed); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic(destinationName); + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage(builder.toString())); + } + + private void sendBytesMessage(boolean compressed) throws Exception { + + StringBuilder builder = new StringBuilder(); + for(int i = 0; i < 10; ++i) { + builder.append(UUID.randomUUID().toString()); + } + + ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection(); + connection.setUseCompression(compressed); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic(destinationName); + MessageProducer producer = session.createProducer(destination); + BytesMessage message = session.createBytesMessage(); + message.writeUTF(builder.toString()); + producer.send(message); + } + + private void sendStreamMessage(boolean compressed) throws Exception { + + StringBuilder builder = new StringBuilder(); + for(int i = 0; i < 10; ++i) { + builder.append(UUID.randomUUID().toString()); + } + + ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection(); + connection.setUseCompression(compressed); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic(destinationName); + MessageProducer producer = session.createProducer(destination); + StreamMessage message = session.createStreamMessage(); + message.writeString(builder.toString()); + producer.send(message); + } + + private void sendMapMessage(boolean compressed) throws Exception { + + StringBuilder builder = new StringBuilder(); + for(int i = 0; i < 10; ++i) { + builder.append(UUID.randomUUID().toString()); + } + + ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection(); + connection.setUseCompression(compressed); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic(destinationName); + MessageProducer producer = session.createProducer(destination); + MapMessage message = session.createMapMessage(); + message.setString("content", builder.toString()); + producer.send(message); + } + + @Test + public void testTextMessageCompression() throws Exception { + sendTextMessage(true); + + ActiveMQTextMessage tcpMessage = (ActiveMQTextMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + ActiveMQTextMessage httpMessage = (ActiveMQTextMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + + assertNotNull(tcpMessage); + assertNotNull(httpMessage); + + ByteSequence tcpContent = tcpMessage.getContent(); + ByteSequence httpContent = httpMessage.getContent(); + + assertNotNull(tcpContent); + assertNotNull(httpContent); + + assertTrue(tcpMessage.isCompressed()); + assertTrue(httpMessage.isCompressed()); + + int tcpCompressedSize = tcpContent.getLength(); + int httpCompressedSize = httpContent.getLength(); + + assertEquals(tcpContent.getLength(), httpContent.getLength()); + assertEquals(tcpMessage.getText(), httpMessage.getText()); + + LOG.info("Received Message on TCP: " + tcpMessage.toString()); + LOG.info("Received Message on HTTP: " + httpMessage.toString()); + + sendTextMessage(false); + + ActiveMQTextMessage uncompressedHttpMessage = (ActiveMQTextMessage) + httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + int httpUncompressedSize = uncompressedHttpMessage.getContent().getLength(); + + assertTrue(httpUncompressedSize > httpCompressedSize); + assertTrue(httpUncompressedSize > tcpCompressedSize); + } + + @Test + public void testBytesMessageCompression() throws Exception { + sendBytesMessage(true); + + ActiveMQBytesMessage tcpMessage = (ActiveMQBytesMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + ActiveMQBytesMessage httpMessage = (ActiveMQBytesMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + + assertNotNull(tcpMessage); + assertNotNull(httpMessage); + + ByteSequence tcpContent = tcpMessage.getContent(); + ByteSequence httpContent = httpMessage.getContent(); + + assertNotNull(tcpContent); + assertNotNull(httpContent); + + assertTrue(tcpMessage.isCompressed()); + assertTrue(httpMessage.isCompressed()); + + int tcpCompressedSize = tcpContent.getLength(); + int httpCompressedSize = httpContent.getLength(); + + assertEquals(tcpContent.getLength(), httpContent.getLength()); + assertEquals(tcpMessage.readUTF(), httpMessage.readUTF()); + + LOG.info("Received Message on TCP: " + tcpMessage.toString()); + LOG.info("Received Message on HTTP: " + httpMessage.toString()); + + sendBytesMessage(false); + + ActiveMQBytesMessage uncompressedHttpMessage = (ActiveMQBytesMessage) + httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + int httpUncompressedSize = uncompressedHttpMessage.getContent().getLength(); + + assertTrue(httpUncompressedSize > httpCompressedSize); + assertTrue(httpUncompressedSize > tcpCompressedSize); + } + + @Test + public void testStreamMessageCompression() throws Exception { + sendStreamMessage(true); + + ActiveMQStreamMessage tcpMessage = (ActiveMQStreamMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + ActiveMQStreamMessage httpMessage = (ActiveMQStreamMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + + assertNotNull(tcpMessage); + assertNotNull(httpMessage); + + ByteSequence tcpContent = tcpMessage.getContent(); + ByteSequence httpContent = httpMessage.getContent(); + + assertNotNull(tcpContent); + assertNotNull(httpContent); + + assertTrue(tcpMessage.isCompressed()); + assertTrue(httpMessage.isCompressed()); + + int tcpCompressedSize = tcpContent.getLength(); + int httpCompressedSize = httpContent.getLength(); + + assertEquals(tcpContent.getLength(), httpContent.getLength()); + assertEquals(tcpMessage.readString(), httpMessage.readString()); + + LOG.info("Received Message on TCP: " + tcpMessage.toString()); + LOG.info("Received Message on HTTP: " + httpMessage.toString()); + + sendStreamMessage(false); + + ActiveMQStreamMessage uncompressedHttpMessage = (ActiveMQStreamMessage) + httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + int httpUncompressedSize = uncompressedHttpMessage.getContent().getLength(); + + assertTrue(httpUncompressedSize > httpCompressedSize); + assertTrue(httpUncompressedSize > tcpCompressedSize); + } + + @Test + public void testMapMessageCompression() throws Exception { + sendMapMessage(true); + + ActiveMQMapMessage tcpMessage = (ActiveMQMapMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + ActiveMQMapMessage httpMessage = (ActiveMQMapMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + + assertNotNull(tcpMessage); + assertNotNull(httpMessage); + + ByteSequence tcpContent = tcpMessage.getContent(); + ByteSequence httpContent = httpMessage.getContent(); + + assertNotNull(tcpContent); + assertNotNull(httpContent); + + assertTrue(tcpMessage.isCompressed()); + assertTrue(httpMessage.isCompressed()); + + int tcpCompressedSize = tcpContent.getLength(); + int httpCompressedSize = httpContent.getLength(); + + assertEquals(tcpContent.getLength(), httpContent.getLength()); + assertEquals(tcpMessage.getString("content"), httpMessage.getString("content")); + + LOG.info("Received Message on TCP: " + tcpMessage.toString()); + LOG.info("Received Message on HTTP: " + httpMessage.toString()); + + sendMapMessage(false); + + ActiveMQMapMessage uncompressedHttpMessage = (ActiveMQMapMessage) + httpConsumer.receive(TimeUnit.SECONDS.toMillis(3)); + int httpUncompressedSize = uncompressedHttpMessage.getContent().getLength(); + + assertTrue(httpUncompressedSize > httpCompressedSize); + assertTrue(httpUncompressedSize > tcpCompressedSize); + } + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + TransportConnector tcpConnector = broker.addConnector(tcpBindAddress); + TransportConnector httpConnector = broker.addConnector(httpBindAddress); + broker.start(); + broker.waitUntilStarted(); + + WaitForJettyListener.waitForJettySocketToAccept(httpConnector.getPublishableConnectString()); + + tcpConnectionFactory = new ActiveMQConnectionFactory(tcpConnector.getPublishableConnectString()); + tcpConnectionFactory.setUseCompression(true); + httpConnectionFactory = new ActiveMQConnectionFactory(httpConnector.getPublishableConnectString()); + httpConnectionFactory.setUseCompression(true); + tcpConnection = (ActiveMQConnection) tcpConnectionFactory.createConnection(); + httpConnection = (ActiveMQConnection) httpConnectionFactory.createConnection(); + tcpSession = tcpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + httpSession = httpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = tcpSession.createTopic(destinationName); + tcpConsumer = tcpSession.createConsumer(destination); + httpConsumer = httpSession.createConsumer(destination); + tcpConnection.start(); + httpConnection.start(); + } + + @After + public void shutDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } +}