git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1188839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-25 18:03:37 +00:00
parent 07eef8e293
commit ef850895a2
6 changed files with 428 additions and 10 deletions

View File

@ -20,22 +20,30 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.entity.ByteArrayEntity;
@ -45,6 +53,7 @@ import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.message.AbstractHttpMessage;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +79,10 @@ public class HttpClientTransport extends HttpTransportSupport {
private int soTimeout = MAX_CLIENT_TIMEOUT;
private boolean useCompression = false;
private boolean canSendCompressed = false;
private int minSendAsCompressedSize = 0;
public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
super(wireFormat, remoteUrl);
}
@ -87,6 +100,17 @@ public class HttpClientTransport extends HttpTransportSupport {
configureMethod(httpMethod);
String data = getTextWireFormat().marshalText(command);
byte[] bytes = data.getBytes("UTF-8");
if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
stream.write(bytes);
stream.close();
httpMethod.addHeader("Content-Type", "application/x-gzip");
if (LOG.isTraceEnabled()) {
LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
}
bytes = bytesOut.toByteArray();
}
ByteArrayEntity entity = new ByteArrayEntity(bytes);
httpMethod.setEntity(entity);
@ -121,9 +145,20 @@ public class HttpClientTransport extends HttpTransportSupport {
return null;
}
private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
Header encoding = answer.getEntity().getContentEncoding();
if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
} else {
return new DataInputStream(answer.getEntity().getContent());
}
}
public void run() {
LOG.trace("HTTP GET consumer thread starting: " + this);
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP GET consumer thread starting: " + this);
}
HttpClient httpClient = getReceiveHttpClient();
URI remoteUrl = getRemoteUrl();
@ -151,7 +186,7 @@ public class HttpClientTransport extends HttpTransportSupport {
}
} else {
receiveCounter++;
DataInputStream stream = new DataInputStream(answer.getEntity().getContent());
DataInputStream stream = createDataInputStream(answer);
Object command = (Object)getTextWireFormat().unmarshal(stream);
if (command == null) {
LOG.debug("Received null command from url: " + remoteUrl);
@ -202,15 +237,40 @@ public class HttpClientTransport extends HttpTransportSupport {
// -------------------------------------------------------------------------
protected void doStart() throws Exception {
LOG.trace("HTTP GET consumer thread starting: " + this);
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP GET consumer thread starting: " + this);
}
HttpClient httpClient = getReceiveHttpClient();
URI remoteUrl = getRemoteUrl();
HttpHead httpMethod = new HttpHead(remoteUrl.toString());
configureMethod(httpMethod);
ResponseHandler<String> handler = new BasicResponseHandler();
// Request the options from the server so we can find out if the broker we are
// talking to supports GZip compressed content. If so and useCompression is on
// then we can compress our POST data, otherwise we must send it uncompressed to
// ensure backwards compatibility.
HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
ResponseHandler<String> handler = new BasicResponseHandler() {
@Override
public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
for(Header header : response.getAllHeaders()) {
if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
LOG.info("Broker Servlet supports GZip compression.");
canSendCompressed = true;
break;
}
}
return super.handleResponse(response);
}
};
try {
httpClient.execute(httpMethod, handler);
httpClient.execute(httpMethod, new BasicResponseHandler());
httpClient.execute(optionsMethod, handler);
} catch(Exception e) {
throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
}
@ -226,6 +286,15 @@ public class HttpClientTransport extends HttpTransportSupport {
protected HttpClient createHttpClient() {
DefaultHttpClient client = new DefaultHttpClient(new ThreadSafeClientConnManager());
if (useCompression) {
client.addRequestInterceptor( new HttpRequestInterceptor() {
@Override
public void process(HttpRequest request, HttpContext context) {
// We expect to received a compression response that we un-gzip
request.addHeader("Accept-Encoding", "gzip");
}
});
}
if (getProxyHost() != null) {
HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
@ -262,4 +331,30 @@ public class HttpClientTransport extends HttpTransportSupport {
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}
public boolean isUseCompression() {
return this.useCompression;
}
public int getMinSendAsCompressedSize() {
return minSendAsCompressedSize;
}
/**
* Sets the minimum size that must be exceeded on a send before compression is used if
* the useCompression option is specified. For very small payloads compression can be
* inefficient compared to the transmission size savings.
*
* Default value is 0.
*
* @param minSendAsCompressedSize
*/
public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
this.minSendAsCompressedSize = minSendAsCompressedSize;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -88,6 +89,10 @@ public class HttpTransportServer extends TransportServerSupport {
contextHandler.setAttribute("wireFormat", getWireFormat());
contextHandler.setAttribute("transportFactory", transportFactory);
contextHandler.setAttribute("transportOptions", transportOptions);
GzipHandler gzipHandler = new GzipHandler();
contextHandler.setHandler(gzipHandler);
server.start();
}

View File

@ -19,14 +19,16 @@ package org.apache.activemq.transport.http;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -77,6 +79,12 @@ public class HttpTunnelServlet extends HttpServlet {
}
}
@Override
protected void doOptions(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.addHeader("Accepts-Encoding", "gzip");
super.doOptions(request, response);
}
@Override
protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
createTransportChannel(request, response);
@ -107,12 +115,16 @@ public class HttpTunnelServlet extends HttpServlet {
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
IOException {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
InputStream stream = request.getInputStream();
String contentType = request.getContentType();
if (contentType != null && contentType.equals("application/x-gzip")) {
stream = new GZIPInputStream(stream);
}
// Read the command directly from the reader, assuming UTF8 encoding
ServletInputStream sis = request.getInputStream();
Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(sis, "UTF-8"));
Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8"));
if (command instanceof WireFormatInfo) {
WireFormatInfo info = (WireFormatInfo) command;

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.http;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HttpJMSMessagesWithCompressionTest {
private static final AtomicInteger counter = new AtomicInteger(1);
enum DESTINATION_TYPE { TOPIC, QUEUE };
protected BrokerService broker;
protected Connection connection;
protected DESTINATION_TYPE destinationType = DESTINATION_TYPE.QUEUE;
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection();
}
@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
if (broker != null) {
broker.stop();
}
}
protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerURL());
return factory;
}
protected String getBrokerURL() {
return "http://localhost:8161?useCompression=true";
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
answer.setUseJmx(false);
answer.setManagementContext(null);
answer.addConnector(getBrokerURL());
return answer;
}
protected Destination createDestination(Session session, DESTINATION_TYPE destinationType) throws JMSException {
switch(destinationType) {
case TOPIC:
return session.createTopic("TOPIC." + counter.getAndIncrement());
case QUEUE:
return session.createQueue("QUEUE." + counter.getAndIncrement());
}
Assert.fail("Invalid destination type: " + destinationType);
return null;
}
abstract class MessageCommand<M extends Message> {
public final void assertMessage(M message) throws JMSException {
Assert.assertNotNull(message);
completeCheck(message);
}
public abstract void completeCheck(M message) throws JMSException;
public abstract M createMessage(Session session) throws JMSException;
}
@SuppressWarnings("unchecked")
private <E extends Message> void executeTest(MessageCommand<E> messageCommand) throws JMSException {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
{
E message = messageCommand.createMessage(session);
producer.send(message);
}
{
E message = (E)consumer.receive(1000);
messageCommand.assertMessage(message);
}
Assert.assertNull(consumer.receiveNoWait());
}
@Test
public void testTextMessage() throws Exception {
executeTest(new MessageCommand<TextMessage>() {
private String textString = "This is a simple text string";
public TextMessage createMessage(Session session) throws JMSException {
return session.createTextMessage(textString);
}
public void completeCheck(TextMessage message) throws JMSException {
Assert.assertEquals("The returned text string was different", textString, message.getText());
}
});
}
@Test
public void testBytesMessage() throws Exception {
executeTest(new MessageCommand<BytesMessage>() {
private byte[] bytes = "This is a simple text string".getBytes();
public BytesMessage createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
return message;
}
public void completeCheck(BytesMessage message) throws JMSException {
byte[] result = new byte[bytes.length];
message.readBytes(result);
Assert.assertArrayEquals("The returned byte array was different", bytes, result);
}
});
}
@Test
public void testMapMessage() throws Exception {
executeTest(new MessageCommand<MapMessage>() {
public MapMessage createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setInt("value", 13);
return message;
}
public void completeCheck(MapMessage message) throws JMSException {
Assert.assertEquals("The returned mapped value was different", 13, message.getInt("value"));
}
});
}
@Test
public void testObjectMessage() throws Exception {
executeTest(new MessageCommand<ObjectMessage>() {
private Long value = new Long(101);
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(value);
}
public void completeCheck(ObjectMessage message) throws JMSException {
Assert.assertEquals("The returned object was different", value, message.getObject());
}
});
}
@Test
public void testStreamMessage() throws Exception {
executeTest(new MessageCommand<StreamMessage>() {
private Long value = new Long(1013);
public StreamMessage createMessage(Session session) throws JMSException {
StreamMessage message = session.createStreamMessage();
message.writeObject(value);
return message;
}
public void completeCheck(StreamMessage message) throws JMSException {
Assert.assertEquals("The returned stream object was different", value, message.readObject());
}
});
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.http;
import java.util.List;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the Wire Level Http GZip compression.
*/
public class HttpJmsSendAndReceiveWithCompressionTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
private static final Logger logger = LoggerFactory.getLogger(HttpJmsSendAndReceiveWithCompressionTest.class);
protected BrokerService broker;
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
broker.start();
}
super.setUp();
WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
}
protected void tearDown() throws Exception {
super.tearDown();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
return connectionFactory;
}
protected String getBrokerURL() {
return "http://localhost:8161?useCompression=true";
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
answer.addConnector(getBrokerURL());
return answer;
}
protected void consumeMessage(Message message, List<Message> messageList) {
super.consumeMessage(message, messageList);
if (message instanceof TextMessage) {
TextMessage textMessage = TextMessage.class.cast(message);
try {
logger.debug("Received text message with text: {}", textMessage.getText());
} catch( javax.jms.JMSException jmsE) {
logger.debug("Received an exception while trying to retrieve the text message", jmsE);
throw new RuntimeException(jmsE);
}
} else {
logger.debug("Received a non text message: {}", message);
}
}
}

View File

@ -44,6 +44,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test covers the Message Compression feature of the ActiveMQConnectionFactory.setUseCompression
* and has no relation to Http transport level compression. The Messages are compressed using the
* deflate algorithm by the ActiveMQ layer before marshalled to XML so only the Message body will
* be compressed.
*/
public class HttpSendCompressedMessagesTest {
private static final Logger LOG = LoggerFactory.getLogger(HttpSendCompressedMessagesTest.class);