ARTEMIS-1056 Performance improvements on AMQP

This commit is contained in:
Clebert Suconic 2017-03-21 11:43:06 -04:00 committed by Justin Bertram
parent ac7cafb210
commit 0bfb39bfb5
7 changed files with 102 additions and 82 deletions

View File

@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg; ByteBuf buffer = (ByteBuf) msg;
handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer)); try {
handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
} finally {
buffer.release();
}
} }
@Override @Override

View File

@ -53,17 +53,17 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator {
@Override @Override
public ByteBuf ioBuffer() { public ByteBuf ioBuffer() {
return UNPOOLED.heapBuffer(); return POOLED.directBuffer();
} }
@Override @Override
public ByteBuf ioBuffer(int initialCapacity) { public ByteBuf ioBuffer(int initialCapacity) {
return UNPOOLED.heapBuffer(initialCapacity); return POOLED.directBuffer(initialCapacity);
} }
@Override @Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); return POOLED.directBuffer(initialCapacity, maxCapacity);
} }
@Override @Override

View File

@ -63,7 +63,7 @@ public class AMQPMessage extends RefCountMessage {
final long messageFormat; final long messageFormat;
ByteBuf data; ByteBuf data;
boolean bufferValid; boolean bufferValid;
boolean durable; Boolean durable;
long messageID; long messageID;
String address; String address;
MessageImpl protonMessage; MessageImpl protonMessage;
@ -491,11 +491,16 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public boolean isDurable() { public boolean isDurable() {
if (getHeader() != null && getHeader().getDurable() != null) { if (durable != null) {
return getHeader().getDurable().booleanValue();
} else {
return durable; return durable;
} }
if (getHeader() != null && getHeader().getDurable() != null) {
durable = getHeader().getDurable().booleanValue();
return durable;
} else {
return durable != null ? durable : false;
}
} }
@Override @Override

View File

@ -481,89 +481,100 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return; return;
} }
Message message = ((MessageReference) delivery.getContext()).getMessage(); try {
Message message = ((MessageReference) delivery.getContext()).getMessage();
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
DeliveryState remoteState = delivery.getRemoteState(); DeliveryState remoteState;
boolean settleImmediate = true; synchronized (connection.getLock()) {
if (remoteState != null) { remoteState = delivery.getRemoteState();
// If we are transactional then we need ack if the msg has been accepted }
if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState; boolean settleImmediate = true;
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); if (remoteState != null) {
// If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) {
if (txState.getOutcome() != null) { TransactionalState txState = (TransactionalState) remoteState;
settleImmediate = false; ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) { if (txState.getOutcome() != null) {
if (!delivery.remotelySettled()) { settleImmediate = false;
TransactionalState txAccepted = new TransactionalState(); Outcome outcome = txState.getOutcome();
txAccepted.setOutcome(Accepted.getInstance()); if (outcome instanceof Accepted) {
txAccepted.setTxnId(txState.getTxnId()); if (!delivery.remotelySettled()) {
delivery.disposition(txAccepted); TransactionalState txAccepted = new TransactionalState();
} txAccepted.setOutcome(Accepted.getInstance());
// we have to individual ack as we can't guarantee we will get the delivery txAccepted.setTxnId(txState.getTxnId());
// updates (including acks) in order synchronized (connection.getLock()) {
// from dealer, a perf hit but a must delivery.disposition(txAccepted);
try { }
sessionSPI.ack(tx, brokerConsumer, message); }
tx.addDelivery(delivery, this); // we have to individual ack as we can't guarantee we will get the delivery
} catch (Exception e) { // updates (including acks) in order
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); // from dealer, a perf hit but a must
try {
sessionSPI.ack(tx, brokerConsumer, message);
tx.addDelivery(delivery, this);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
} }
} }
} } else if (remoteState instanceof Accepted) {
} else if (remoteState instanceof Accepted) { //this can happen in the twice ack mode, that is the receiver accepts and settles separately
//this can happen in the twice ack mode, that is the receiver accepts and settles separately //acking again would show an exception but would have no negative effect but best to handle anyway.
//acking again would show an exception but would have no negative effect but best to handle anyway. if (delivery.isSettled()) {
if (delivery.isSettled()) { return;
return; }
} // we have to individual ack as we can't guarantee we will get the delivery updates
// we have to individual ack as we can't guarantee we will get the delivery updates // (including acks) in order
// (including acks) in order // from dealer, a perf hit but a must
// from dealer, a perf hit but a must try {
try { sessionSPI.ack(null, brokerConsumer, message);
sessionSPI.ack(null, brokerConsumer, message); } catch (Exception e) {
} catch (Exception e) { log.warn(e.toString(), e);
log.warn(e.toString(), e); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); }
} } else if (remoteState instanceof Released) {
} else if (remoteState instanceof Released) { try {
try {
sessionSPI.cancel(brokerConsumer, message, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Rejected) {
try {
sessionSPI.cancel(brokerConsumer, message, true);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Modified) {
try {
Modified modification = (Modified) remoteState;
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
sessionSPI.cancel(brokerConsumer, message, true);
} else {
sessionSPI.cancel(brokerConsumer, message, false); sessionSPI.cancel(brokerConsumer, message, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Rejected) {
try {
sessionSPI.cancel(brokerConsumer, message, true);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Modified) {
try {
Modified modification = (Modified) remoteState;
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
sessionSPI.cancel(brokerConsumer, message, true);
} else {
sessionSPI.cancel(brokerConsumer, message, false);
}
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
} }
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
} }
} // todo add tag caching
// todo add tag caching if (!preSettle) {
if (!preSettle) { protonSession.replaceTag(delivery.getTag());
protonSession.replaceTag(delivery.getTag()); }
}
if (settleImmediate) settle(delivery); if (settleImmediate)
settle(delivery);
} else { } else {
// todo not sure if we need to do anything here // todo not sure if we need to do anything here
}
} finally {
connection.flush();
} }
} }

View File

@ -240,7 +240,7 @@ public class ProtonHandler extends ProtonInitializable {
} }
// For returning PooledBytes // For returning PooledBytes
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size); ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
ByteBuffer head = transport.head(); ByteBuffer head = transport.head();
head.position(offset); head.position(offset);
head.limit(offset + size); head.limit(offset + size);

View File

@ -49,7 +49,7 @@ class HornetQProtocolManager extends CoreProtocolManager {
buffer.getByte(5) == 'T' && buffer.getByte(5) == 'T' &&
buffer.getByte(6) == 'Q') { buffer.getByte(6) == 'Q') {
//todo add some handshaking //todo add some handshaking
buffer.readBytes(7); buffer.skipBytes(7);
} }
} }

View File

@ -174,7 +174,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
//if we are not an old client then handshake //if we are not an old client then handshake
if (isArtemis(buffer)) { if (isArtemis(buffer)) {
buffer.readBytes(7); buffer.skipBytes(7);
} }
} }