Fix for AMQ-5093. amqp+nio and amqp+nio+ssl were failing on large messages

This commit is contained in:
Kevin Earls 2014-03-11 12:00:16 +01:00
parent 570dbb4372
commit 87420cc455
5 changed files with 229 additions and 185 deletions

View File

@ -18,13 +18,8 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@ -32,10 +27,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
public class AmqpNioSslTransport extends NIOSSLTransport {
private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
public final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class);
private boolean magicConsumed = false;
private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@ -55,131 +47,6 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
@Override
protected void processCommand(ByteBuffer plain) throws Exception {
// Are we waiting for the next Command or are we building on the current one? The
// frame size is in the first 4 bytes.
if (nextFrameSize == -1) {
// We can get small packets that don't give us enough for the frame size
// so allocate enough for the initial size value and
if (plain.remaining() < 4) {
if (currentBuffer == null) {
currentBuffer = ByteBuffer.allocate(4);
amqpNioTransportHelper.processCommand(plain);
}
// Go until we fill the integer sized current buffer.
while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
currentBuffer.put(plain.get());
}
// Didn't we get enough yet to figure out next frame size.
if (currentBuffer.hasRemaining()) {
return;
} else {
currentBuffer.flip();
nextFrameSize = currentBuffer.getInt();
}
} else {
// Either we are completing a previous read of the next frame size or its
// fully contained in plain already.
if (currentBuffer != null) {
// Finish the frame size integer read and get from the current buffer.
while (currentBuffer.hasRemaining()) {
currentBuffer.put(plain.get());
}
currentBuffer.flip();
nextFrameSize = currentBuffer.getInt();
} else {
nextFrameSize = plain.getInt();
}
}
}
// There are three possibilities when we get here. We could have a partial frame,
// a full frame, or more than 1 frame
while (true) {
LOG.debug("Entering while loop with plain.position {} remaining {} ", plain.position(), plain.remaining());
// handle headers, which start with 'A','M','Q','P' rather than size
if (nextFrameSize == AMQP_HEADER_VALUE) {
nextFrameSize = handleAmqpHeader(plain);
if (nextFrameSize == -1) {
return;
}
}
validateFrameSize(nextFrameSize);
// now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called
// because we need to put back the 4 bytes we read to determine the size)
currentBuffer = ByteBuffer.allocate(nextFrameSize );
currentBuffer.putInt(nextFrameSize);
if (currentBuffer.remaining() >= plain.remaining()) {
currentBuffer.put(plain);
} else {
byte[] fill = new byte[currentBuffer.remaining()];
plain.get(fill);
currentBuffer.put(fill);
}
// Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true,
// we have not filled the buffer yet, i.e. we haven't received the full frame.
if (currentBuffer.hasRemaining()) {
return;
} else {
currentBuffer.flip();
LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(), currentBuffer.limit());
doConsume(AmqpSupport.toBuffer(currentBuffer));
// Determine if there are more frames to process
if (plain.hasRemaining()) {
if (plain.remaining() < 4) {
nextFrameSize = 4;
} else {
nextFrameSize = plain.getInt();
}
} else {
nextFrameSize = -1;
currentBuffer = null;
return;
}
}
}
}
private void validateFrameSize(int frameSize) throws IOException {
if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
throw new IOException("Frame size of " + nextFrameSize +
"larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
}
}
private int handleAmqpHeader(ByteBuffer plain) {
int nextFrameSize;
LOG.debug("Consuming AMQP_HEADER");
currentBuffer = ByteBuffer.allocate(8);
currentBuffer.putInt(AMQP_HEADER_VALUE);
while (currentBuffer.hasRemaining()) {
currentBuffer.put(plain.get());
}
currentBuffer.flip();
if (!magicConsumed) { // The first case we see is special and has to be handled differently
doConsume(new AmqpHeader(new Buffer(currentBuffer)));
magicConsumed = true;
} else {
doConsume(AmqpSupport.toBuffer(currentBuffer));
}
if (plain.hasRemaining()) {
if (plain.remaining() < 4) {
nextFrameSize = 4;
} else {
nextFrameSize = plain.getInt();
}
} else {
nextFrameSize = -1;
currentBuffer = null;
}
return nextFrameSize;
}
}

View File

@ -16,8 +16,17 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@ -28,26 +37,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/
public class AmqpNioTransport extends TcpTransport {
private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
private SocketChannel channel;
private SelectorSelection selection;
private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
private ByteBuffer inputBuffer;
@ -110,40 +107,7 @@ public class AmqpNioTransport extends TcpTransport {
receiveCounter += readSize;
inputBuffer.flip();
if( !magicRead ) {
if( inputBuffer.remaining()>= 8 ) {
magicRead = true;
Buffer magic = new Buffer(8);
for (int i = 0; i < 8; i++) {
magic.data[i] = inputBuffer.get();
}
doConsume(new AmqpHeader(magic));
} else {
inputBuffer.flip();
continue;
}
}
while(inputBuffer.position() < inputBuffer.limit()) {
inputBuffer.mark();
int commandSize = inputBuffer.getInt();
inputBuffer.reset();
// handles buffers starting with 'A','M','Q','P' rather than size
if (commandSize == AMQP_HEADER_VALUE) {
doConsume(AmqpSupport.toBuffer(inputBuffer));
break;
}
byte[] bytes = new byte[commandSize];
ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
inputBuffer.get(bytes, 0, commandSize);
commandBuffer.put(bytes);
commandBuffer.flip();
doConsume(AmqpSupport.toBuffer(commandBuffer));
commandBuffer.clear();
}
amqpNioTransportHelper.processCommand(inputBuffer);
// clear the buffer
inputBuffer.clear();

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.TransportSupport;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class AmqpNioTransportHelper {
private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
private final Integer AMQP_HEADER_VALUE;
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class);
protected int nextFrameSize = -1;
protected ByteBuffer currentBuffer;
private boolean magicConsumed = false;
private TransportSupport transportSupport;
public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException {
AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
this.transportSupport = transportSupport;
}
protected void processCommand(ByteBuffer plain) throws Exception {
// Are we waiting for the next Command or building on the current one? The frame size is in the first 4 bytes.
if (nextFrameSize == -1) {
// We can get small packets that don't give us enough for the frame size
// so allocate enough for the initial size value and
if (plain.remaining() < 4) {
if (currentBuffer == null) {
currentBuffer = ByteBuffer.allocate(4);
}
// Go until we fill the integer sized current buffer.
while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
currentBuffer.put(plain.get());
}
// Didn't we get enough yet to figure out next frame size.
if (currentBuffer.hasRemaining()) {
return;
} else {
currentBuffer.flip();
nextFrameSize = currentBuffer.getInt();
}
} else {
// Either we are completing a previous read of the next frame size or its
// fully contained in plain already.
if (currentBuffer != null) {
// Finish the frame size integer read and get from the current buffer.
while (currentBuffer.hasRemaining()) {
currentBuffer.put(plain.get());
}
currentBuffer.flip();
nextFrameSize = currentBuffer.getInt();
} else {
nextFrameSize = plain.getInt();
}
}
}
// There are three possibilities when we get here. We could have a partial frame,
// a full frame, or more than 1 frame
while (true) {
// handle headers, which start with 'A','M','Q','P' rather than size
if (nextFrameSize == AMQP_HEADER_VALUE) {
nextFrameSize = handleAmqpHeader(plain);
if (nextFrameSize == -1) {
return;
}
}
validateFrameSize(nextFrameSize);
// now we have the data, let's reallocate and try to fill it, (currentBuffer.putInt() is called TODO update
// because we need to put back the 4 bytes we read to determine the size)
if (currentBuffer == null || (currentBuffer.limit() == 4)) {
currentBuffer = ByteBuffer.allocate(nextFrameSize);
currentBuffer.putInt(nextFrameSize);
}
if (currentBuffer.remaining() >= plain.remaining()) {
currentBuffer.put(plain);
} else {
byte[] fill = new byte[currentBuffer.remaining()];
plain.get(fill);
currentBuffer.put(fill);
}
// Either we have enough data for a new command or we have to wait for some more. If hasRemaining is true,
// we have not filled the buffer yet, i.e. we haven't received the full frame.
if (currentBuffer.hasRemaining()) {
return;
} else {
currentBuffer.flip();
LOG.debug("Calling doConsume with position {} limit {}", currentBuffer.position(), currentBuffer.limit());
transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer));
currentBuffer = null;
nextFrameSize = -1;
// Determine if there are more frames to process
if (plain.hasRemaining()) {
if (plain.remaining() < 4) {
currentBuffer = ByteBuffer.allocate(4);
while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
currentBuffer.put(plain.get());
}
return;
} else {
nextFrameSize = plain.getInt();
}
} else {
return;
}
}
}
}
private void validateFrameSize(int frameSize) throws IOException {
if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
throw new IOException("Frame size of " + nextFrameSize +
"larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
}
}
private int handleAmqpHeader(ByteBuffer plain) {
int nextFrameSize;
LOG.debug("Consuming AMQP_HEADER");
currentBuffer = ByteBuffer.allocate(8);
currentBuffer.putInt(AMQP_HEADER_VALUE);
while (currentBuffer.hasRemaining()) {
currentBuffer.put(plain.get());
}
currentBuffer.flip();
if (!magicConsumed) { // The first case we see is special and has to be handled differently
transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer)));
magicConsumed = true;
} else {
transportSupport.doConsume(AmqpSupport.toBuffer(currentBuffer));
}
if (plain.hasRemaining()) {
if (plain.remaining() < 4) {
nextFrameSize = 4;
} else {
nextFrameSize = plain.getInt();
}
} else {
nextFrameSize = -1;
currentBuffer = null;
}
return nextFrameSize;
}
}

View File

@ -784,6 +784,42 @@ public class JMSClientTest extends AmqpTestSupport {
connection.close();
}
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
}
@Test(timeout = 60 * 1000)
public void testSendLargeMessage() throws JMSException, InterruptedException {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = name.toString();
Queue queue = session.createQueue(queueName);
MessageProducer producer=session.createProducer(queue);
int messageSize = 1024 * 1024;
String messageText = createLargeString(messageSize);
Message m=session.createTextMessage(messageText);
LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName);
producer.send(m);
MessageConsumer consumer=session.createConsumer(queue);
Message message = consumer.receive();
assertNotNull(message);
assertTrue(message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
LOG.debug(">>>> Received message of length {}", textMessage.getText().length());
assertEquals(messageSize, textMessage.getText().length());
assertEquals(messageText, textMessage.getText());
}
private Connection createConnection() throws JMSException {
return createConnection(name.toString(), false, false);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.joram;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
@ -41,6 +42,7 @@ import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
/**
* Run the JoramJmsTests using amqp+nio
*/
@Ignore("AMQ-5094")
@RunWith(Suite.class)
@Suite.SuiteClasses({
// TopicSessionTest.class, // Hangs, see https://issues.apache.org/jira/browse/PROTON-154