Add some more asserts to the test and clean up logging in the protocol converter, replace all the System.out.println calls

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1492250 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-12 15:41:56 +00:00
parent 0168a826f7
commit 3513728f0f
1 changed files with 22 additions and 8 deletions

View File

@ -130,12 +130,18 @@ class AmqpProtocolConverter {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
System.out.println(String.format("%s | RECV: %s", amqpTransport.getRemoteAddress(), transportFrame.getBody()));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("%s | RECV: %s",
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
}
@Override
public void sentFrame(TransportFrame transportFrame) {
System.out.println(String.format("%s | SENT: %s", amqpTransport.getRemoteAddress(), transportFrame.getBody()));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("%s | SENT: %s",
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
}
});
}
@ -418,7 +424,9 @@ class AmqpProtocolConverter {
private void onSessionClose(Session session) {
AmqpSessionContext sessionContext = (AmqpSessionContext) session.getContext();
if (sessionContext != null) {
System.out.println(sessionContext.sessionId);
if (LOG.isTraceEnabled()) {
LOG.trace("Session {} closed", sessionContext.sessionId);
}
sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
session.setContext(null);
}
@ -464,7 +472,7 @@ class AmqpProtocolConverter {
public void onDelivery(Delivery delivery) throws Exception {
Receiver receiver = ((Receiver) delivery.getLink());
if (!delivery.isReadable()) {
System.out.println("it was not readable!");
LOG.debug("Delivery was not readable!");
return;
}
@ -588,7 +596,7 @@ class AmqpProtocolConverter {
}
Object action = ((AmqpValue) msg.getBody()).getValue();
System.out.println("COORDINATOR received: " + action + ", [" + buffer + "]");
LOG.debug("COORDINATOR received: " + action + ", [" + buffer + "]");
if (action instanceof Declare) {
Declare declare = (Declare) action;
if (declare.getGlobalId() != null) {
@ -598,7 +606,9 @@ class AmqpProtocolConverter {
long txid = nextTransactionId++;
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null);
System.out.println("started transaction " + txid);
if (LOG.isTraceEnabled()) {
LOG.trace("started transaction " + txid);
}
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txid)));
@ -610,10 +620,14 @@ class AmqpProtocolConverter {
byte operation;
if (discharge.getFail()) {
System.out.println("rollback transaction " + txid);
if (LOG.isTraceEnabled()) {
LOG.trace("rollback transaction " + txid);
}
operation = TransactionInfo.ROLLBACK;
} else {
System.out.println("commit transaction " + txid);
if (LOG.isTraceEnabled()) {
LOG.trace("commit transaction " + txid);
}
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);