Improve format of the amqp trace messages, implement better producer flow control.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1408953 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-13 20:52:18 +00:00
parent 914e177239
commit 7a0c1f85ab
2 changed files with 16 additions and 17 deletions

View File

@ -94,12 +94,12 @@ class AmqpProtocolConverter {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
System.out.println(String.format("RECV: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
System.out.println(String.format("%s | RECV: %s", amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
@Override
public void sentFrame(TransportFrame transportFrame) {
System.out.println(String.format("SENT: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
System.out.println(String.format("%s | SENT: %s", amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
});
@ -474,7 +474,7 @@ class AmqpProtocolConverter {
}
@Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
@ -494,13 +494,11 @@ class AmqpProtocolConverter {
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
ResponseHandler handler = null;
if( delivery.remotelySettled() ) {
delivery.settle();
} else {
handler = new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
message.onSend();
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( !delivery.remotelySettled() ) {
if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response;
Rejected rejected = new Rejected();
@ -509,14 +507,12 @@ class AmqpProtocolConverter {
rejected.setError(errors);
delivery.disposition(rejected);
}
delivery.settle();
pumpProtonToSocket();
}
};
}
message.onSend();
sendToActiveMQ(message, handler);
receiver.flow(1);
delivery.settle();
pumpProtonToSocket();
}
});
}
}

View File

@ -42,4 +42,7 @@ public interface AmqpTransport {
public void stop() throws Exception;
public String getTransformer();
public String getRemoteAddress();
}