Set session incoming credit value and improves the flow handling to
reduce chatter on each message send, improves overall producer
performance significantly.
This commit is contained in:
Timothy Bish 2014-05-20 18:26:17 -04:00
parent b9d51bf1d4
commit 9b6f419d44
1 changed files with 25 additions and 8 deletions

View File

@ -170,6 +170,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
while (!done) { while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer(); ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) { if (toWrite != null && toWrite.hasRemaining()) {
LOG.trace("Sending {} bytes out", toWrite.limit());
amqpTransport.sendToAmqp(toWrite); amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed(); protonTransport.outputConsumed();
} else { } else {
@ -463,6 +464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++); AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
session.setContext(sessionContext); session.setContext(sessionContext);
sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null); sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
session.setIncomingCapacity(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE * prefetch);
session.open(); session.open();
} }
@ -608,10 +610,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
message.onSend(); message.onSend();
if (!delivery.remotelySettled()) {
sendToActiveMQ(message, new ResponseHandler() { sendToActiveMQ(message, new ResponseHandler() {
@Override @Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (!delivery.remotelySettled()) {
if (response.isException()) { if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response; ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
@ -620,14 +622,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
condition.setDescription(er.getException().getMessage()); condition.setDescription(er.getException().getMessage());
rejected.setError(condition); rejected.setError(condition);
delivery.disposition(rejected); delivery.disposition(rejected);
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}",
prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
} }
}
receiver.flow(1);
delivery.disposition(Accepted.getInstance()); delivery.disposition(Accepted.getInstance());
delivery.settle(); delivery.settle();
}
pumpProtonToSocket(); pumpProtonToSocket();
} }
}); });
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}",
prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
pumpProtonToSocket();
}
sendToActiveMQ(message, null);
}
} }
} }