mirror of https://github.com/apache/activemq.git
Fix for https://issues.apache.org/jira/browse/AMQ-5042. Handles receiving multiple frames at once from an nio channel
This commit is contained in:
parent
b97fa15d53
commit
da63f3f20a
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.amqp;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
|
@ -29,7 +30,6 @@ import java.nio.channels.SocketChannel;
|
|||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.activemq.transport.nio.NIOInputStream;
|
||||
import org.apache.activemq.transport.nio.NIOOutputStream;
|
||||
import org.apache.activemq.transport.nio.SelectorManager;
|
||||
import org.apache.activemq.transport.nio.SelectorSelection;
|
||||
|
@ -38,11 +38,15 @@ import org.apache.activemq.util.IOExceptionSupport;
|
|||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.fusesource.hawtbuf.Buffer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
|
||||
*/
|
||||
public class AmqpNioTransport extends TcpTransport {
|
||||
private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
|
||||
private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
|
||||
|
||||
private SocketChannel channel;
|
||||
private SelectorSelection selection;
|
||||
|
@ -120,10 +124,28 @@ public class AmqpNioTransport extends TcpTransport {
|
|||
}
|
||||
}
|
||||
|
||||
doConsume(AmqpSupport.toBuffer(inputBuffer));
|
||||
while(inputBuffer.position() < inputBuffer.limit()) {
|
||||
inputBuffer.mark();
|
||||
int commandSize = inputBuffer.getInt();
|
||||
inputBuffer.reset();
|
||||
|
||||
// handles buffers starting with 'A','M','Q','P' rather than size
|
||||
if (commandSize == AMQP_HEADER_VALUE) {
|
||||
doConsume(AmqpSupport.toBuffer(inputBuffer));
|
||||
break;
|
||||
}
|
||||
|
||||
byte[] bytes = new byte[commandSize];
|
||||
ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize);
|
||||
inputBuffer.get(bytes, 0, commandSize);
|
||||
commandBuffer.put(bytes);
|
||||
commandBuffer.flip();
|
||||
doConsume(AmqpSupport.toBuffer(commandBuffer));
|
||||
commandBuffer.clear();
|
||||
}
|
||||
|
||||
// clear the buffer
|
||||
inputBuffer.clear();
|
||||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
onException(e);
|
||||
|
|
|
@ -18,117 +18,19 @@ package org.apache.activemq.transport.amqp;
|
|||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
||||
/**
|
||||
* Test the JMS client when connected to the NIO transport.
|
||||
*/
|
||||
public class JMSClientNioTest extends JMSClientTest {
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testProducerConsume() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testTransactedConsumer() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testRollbackRececeivedMessage() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testSelectors() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testProducerThrowsWhenBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testConsumerReceiveReturnsBrokerStops() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testBrokerRestartWontHangConnectionClose() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=120000)
|
||||
public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testSyncSends() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testDurableConsumerAsync() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testDurableConsumerSync() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testTopicConsumerAsync() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=45000)
|
||||
public void testTopicConsumerSync() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=60000)
|
||||
public void testConnectionsAreClosed() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testExecptionListenerCalledOnBrokerStop() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test(timeout=30000)
|
||||
public void testSessionTransactedCommit() throws JMSException, InterruptedException {
|
||||
}
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioTest.class);
|
||||
|
||||
@Override
|
||||
protected int getBrokerPort() {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/** >>>>>> pumping
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -47,23 +47,36 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
|
|||
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.objectweb.jtests.jms.framework.TestConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JMSClientTest extends AmqpTestSupport {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
|
||||
@Rule public TestName name = new TestName();
|
||||
java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
|
||||
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
LOG.debug("Starting test {}", name.getMethodName());
|
||||
LOG.debug("in setUp of {}", name.getMethodName());
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
LOG.debug("in tearDown of {}", name.getMethodName());
|
||||
super.tearDown();
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout=30000)
|
||||
public void testProducerConsume() throws Exception {
|
||||
|
@ -169,7 +182,7 @@ public class JMSClientTest extends AmqpTestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout=60000)
|
||||
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
|
||||
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
@ -760,7 +773,9 @@ public class JMSClientTest extends AmqpTestSupport {
|
|||
|
||||
private Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
|
||||
|
||||
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", getBrokerPort(), "admin", "password");
|
||||
int brokerPort = getBrokerPort();
|
||||
LOG.debug("Creating connection on port {}", brokerPort);
|
||||
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password");
|
||||
|
||||
factory.setSyncPublish(syncPublish);
|
||||
factory.setTopicPrefix("topic://");
|
||||
|
|
Loading…
Reference in New Issue