AMQP: Sending and receiving large number of messages was broken.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1394266 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-04 21:00:36 +00:00
parent cac8c9c385
commit 72d72ab79e
4 changed files with 53 additions and 27 deletions

View File

@ -305,11 +305,18 @@ class AmqpProtocolConverter {
@Override @Override
public void onDelivery(Delivery delivery) throws Exception { public void onDelivery(Delivery delivery) throws Exception {
if( current ==null ) { Receiver receiver = ((Receiver)delivery.getLink());
if( !delivery.isReadable() ) {
System.out.println("it was not readable!");
// delivery.settle();
// receiver.advance();
return;
}
if( current==null ) {
current = new ByteArrayOutputStream(); current = new ByteArrayOutputStream();
} }
Receiver receiver = ((Receiver)delivery.getLink());
int count; int count;
byte data[] = new byte[1024*4]; byte data[] = new byte[1024*4];
while( (count = receiver.recv(data, 0, data.length)) > 0 ) { while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
@ -321,6 +328,9 @@ class AmqpProtocolConverter {
return; return;
} }
receiver.advance();
delivery.settle();
final Buffer buffer = current.toBuffer(); final Buffer buffer = current.toBuffer();
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em); final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
@ -410,18 +420,21 @@ class AmqpProtocolConverter {
pumpProtonToSocket(); pumpProtonToSocket();
} }
Buffer current; Buffer currentBuffer;
Delivery currentDelivery;
public void pumpOutbound() { public void pumpOutbound() {
while(true) { while(true) {
while( current!=null ) { while( currentBuffer !=null ) {
int sent = sender.send(current.data, current.offset, current.length); int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if( sent > 0 ) { if( sent > 0 ) {
current.moveHead(sent); currentBuffer.moveHead(sent);
if( current.length == 0 ) { if( currentBuffer.length == 0 ) {
currentDelivery.settle();
sender.advance(); sender.advance();
current = null; currentBuffer = null;
currentDelivery = null;
} }
} else { } else {
return; return;
@ -438,10 +451,10 @@ class AmqpProtocolConverter {
final EncodedMessage amqp = outboundTransformer.transform(jms); final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) { if( amqp!=null && amqp.getLength() > 0 ) {
current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
final byte[] tag = nextTag(); final byte[] tag = nextTag();
final Delivery delivery = sender.delivery(tag, 0, tag.length); currentDelivery = sender.delivery(tag, 0, tag.length);
delivery.setContext(md); currentDelivery.setContext(md);
} else { } else {
// TODO: message could not be generated what now? // TODO: message could not be generated what now?

View File

@ -59,7 +59,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void oneway(Object o) throws IOException { public void oneway(Object o) throws IOException {
try { try {
final Command command = (Command) o; final Command command = (Command) o;
synchronized (protocolConverter) {
protocolConverter.onActiveMQCommand(command); protocolConverter.onActiveMQCommand(command);
}
} catch (Exception e) { } catch (Exception e) {
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }
@ -70,8 +72,9 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
if (trace) { if (trace) {
TRACE.trace("Received: \n" + command); TRACE.trace("Received: \n" + command);
} }
synchronized (protocolConverter) {
protocolConverter.onAMQPData((Buffer) command); protocolConverter.onAMQPData((Buffer) command);
}
} catch (IOException e) { } catch (IOException e) {
handleException(e); handleException(e);
} catch (JMSException e) { } catch (JMSException e) {

View File

@ -52,7 +52,9 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
Message rc; Message rc;
final Section body = amqp.getBody(); final Section body = amqp.getBody();
if( body instanceof Data ) { if( body == null ) {
rc = vendor.createMessage();
} else if( body instanceof Data ) {
Binary d = ((Data) body).getValue(); Binary d = ((Data) body).getValue();
BytesMessage m = vendor.createBytesMessage(); BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
@ -91,7 +93,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
// jms = m; // jms = m;
} }
} else { } else {
throw new RuntimeException("Unexpected body type."); throw new RuntimeException("Unexpected body type: "+body.getClass());
} }
rc.setJMSDeliveryMode(defaultDeliveryMode); rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority); rc.setJMSPriority(defaultPriority);

View File

@ -34,7 +34,9 @@ public class SwiftMQClientTest extends AmqpTestSupport {
public void testSendReceive() throws Exception { public void testSendReceive() throws Exception {
String queue = "testqueue"; String queue = "testqueue";
int nMsgs = 1; int nMsgs = 100;
final String dataFormat = "%01024d";
int qos = QoS.AT_MOST_ONCE; int qos = QoS.AT_MOST_ONCE;
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT); AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
@ -57,28 +59,34 @@ public class SwiftMQClientTest extends AmqpTestSupport {
for (int i = 0; i < nMsgs; i++) { for (int i = 0; i < nMsgs; i++) {
AMQPMessage msg = new AMQPMessage(); AMQPMessage msg = new AMQPMessage();
System.out.println("Sending " + i); System.out.println("Sending " + i);
msg.setAmqpValue(new AmqpValue(new AMQPString(String.format("%010d", i)))); msg.setAmqpValue(new AmqpValue(new AMQPString(String.format(dataFormat, i))));
p.send(msg); p.send(msg);
} }
p.close(); p.close();
session.close(); session.close();
} }
System.out.println("=======================================================================================");
System.out.println(" receiving ");
System.out.println("=======================================================================================");
{ {
Session session = connection.createSession(10, 10); Session session = connection.createSession(10, 10);
Consumer c = session.createConsumer(queue, 100, qos, true, null); Consumer c = session.createConsumer(queue, 100, qos, true, null);
// Receive messages non-transacted // Receive messages non-transacted
for (int i = 0; i < nMsgs; i++) { int i = 0;
while ( i < nMsgs) {
AMQPMessage msg = c.receive(); AMQPMessage msg = c.receive();
if( msg!=null ) {
final AMQPType value = msg.getAmqpValue().getValue(); final AMQPType value = msg.getAmqpValue().getValue();
if (value instanceof AMQPString) { if (value instanceof AMQPString) {
String s = ((AMQPString) value).getValue(); String s = ((AMQPString) value).getValue();
assertEquals(String.format("%010d", i), s); assertEquals(String.format(dataFormat, i), s);
System.out.println("Received: " + i); System.out.println("Received: " + i);
} }
if (!msg.isSettled()) if (!msg.isSettled())
msg.accept(); msg.accept();
i++;
}
} }
c.close(); c.close();
session.close(); session.close();