ARTEMIS-2067 Clean up some code in the AMQP protocol handling paths
Cleans up some of the code on the proton event handler, most noteable: 1. Fix IOCallback creation on each outbound send, use single instance as the handler only ever does a flush and has no attached state. 2. Fix redundent locking and unlocking of connection lock on the event path that already ensures that lock is held. 3. Set presettle state on the server sender at attach as it cannot change afterwards so checking on every message is not needed. 4. Improve buffer type checking on receive to reduce amount of work
This commit is contained in:
parent
a478eafe9f
commit
d1939620c0
|
@ -208,20 +208,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
* called when Proton receives a message to be delivered via a Delivery.
|
* called when Proton receives a message to be delivered via a Delivery.
|
||||||
*
|
*
|
||||||
* This may be called more than once per deliver so we have to cache the buffer until we have received it all.
|
* This may be called more than once per deliver so we have to cache the buffer until we have received it all.
|
||||||
*
|
*/
|
||||||
* */
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
||||||
Receiver receiver;
|
|
||||||
try {
|
try {
|
||||||
|
Receiver receiver = ((Receiver) delivery.getLink());
|
||||||
|
|
||||||
if (!delivery.isReadable()) {
|
if (receiver.current() != delivery) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (delivery.isAborted()) {
|
if (delivery.isAborted()) {
|
||||||
receiver = ((Receiver) delivery.getLink());
|
|
||||||
|
|
||||||
// Aborting implicitly remotely settles, so advance
|
// Aborting implicitly remotely settles, so advance
|
||||||
// receiver to the next delivery and settle locally.
|
// receiver to the next delivery and settle locally.
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
|
@ -233,16 +230,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
} else if (delivery.isPartial()) {
|
||||||
|
|
||||||
if (delivery.isPartial()) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver = ((Receiver) delivery.getLink());
|
|
||||||
|
|
||||||
Transaction tx = null;
|
Transaction tx = null;
|
||||||
|
|
||||||
ReadableBuffer data = receiver.recv();
|
ReadableBuffer data = receiver.recv();
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
|
|
||||||
|
@ -267,13 +259,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
|
|
||||||
condition.setDescription(e.getMessage());
|
condition.setDescription(e.getMessage());
|
||||||
rejected.setError(condition);
|
rejected.setError(condition);
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
delivery.disposition(rejected);
|
delivery.disposition(rejected);
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
private static final Symbol SHARED = Symbol.valueOf("shared");
|
private static final Symbol SHARED = Symbol.valueOf("shared");
|
||||||
private static final Symbol GLOBAL = Symbol.valueOf("global");
|
private static final Symbol GLOBAL = Symbol.valueOf("global");
|
||||||
|
|
||||||
|
private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
|
||||||
|
|
||||||
private Consumer brokerConsumer;
|
private Consumer brokerConsumer;
|
||||||
|
|
||||||
protected final AMQPSessionContext protonSession;
|
protected final AMQPSessionContext protonSession;
|
||||||
|
@ -101,6 +103,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
private boolean shared = false;
|
private boolean shared = false;
|
||||||
private boolean global = false;
|
private boolean global = false;
|
||||||
private boolean isVolatile = false;
|
private boolean isVolatile = false;
|
||||||
|
private boolean preSettle;
|
||||||
private SimpleString tempQueueName;
|
private SimpleString tempQueueName;
|
||||||
|
|
||||||
public ProtonServerSenderContext(AMQPConnectionContext connection,
|
public ProtonServerSenderContext(AMQPConnectionContext connection,
|
||||||
|
@ -417,6 +420,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Detect if sender is in pre-settle mode.
|
||||||
|
preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
||||||
|
|
||||||
// We need to update the source with any filters we support otherwise the client
|
// We need to update the source with any filters we support otherwise the client
|
||||||
// is free to consider the attach as having failed if we don't send back what we
|
// is free to consider the attach as having failed if we don't send back what we
|
||||||
// do support or if we send something we don't support the client won't know we
|
// do support or if we send something we don't support the client won't know we
|
||||||
|
@ -538,17 +544,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Message message = ((MessageReference) delivery.getContext()).getMessage();
|
Message message = ((MessageReference) delivery.getContext()).getMessage();
|
||||||
|
DeliveryState remoteState = delivery.getRemoteState();
|
||||||
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
|
||||||
|
|
||||||
DeliveryState remoteState;
|
|
||||||
|
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
remoteState = delivery.getRemoteState();
|
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean settleImmediate = true;
|
boolean settleImmediate = true;
|
||||||
if (remoteState instanceof Accepted) {
|
if (remoteState instanceof Accepted) {
|
||||||
|
@ -558,8 +554,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// we have to individual ack as we can't guarantee we will get the delivery updates
|
// we have to individual ack as we can't guarantee we will get the delivery updates
|
||||||
// (including acks) in order
|
// (including acks) in order from dealer, a performance hit but a must
|
||||||
// from dealer, a perf hit but a must
|
|
||||||
try {
|
try {
|
||||||
sessionSPI.ack(null, brokerConsumer, message);
|
sessionSPI.ack(null, brokerConsumer, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -580,16 +575,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
TransactionalState txAccepted = new TransactionalState();
|
TransactionalState txAccepted = new TransactionalState();
|
||||||
txAccepted.setOutcome(Accepted.getInstance());
|
txAccepted.setOutcome(Accepted.getInstance());
|
||||||
txAccepted.setTxnId(txState.getTxnId());
|
txAccepted.setTxnId(txState.getTxnId());
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
delivery.disposition(txAccepted);
|
delivery.disposition(txAccepted);
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// we have to individual ack as we can't guarantee we will get the delivery
|
// we have to individual ack as we can't guarantee we will get the delivery
|
||||||
// updates (including acks) in order
|
// (including acks) in order from dealer, a performance hit but a must
|
||||||
// from dealer, a perf hit but a must
|
|
||||||
try {
|
try {
|
||||||
sessionSPI.ack(tx, brokerConsumer, message);
|
sessionSPI.ack(tx, brokerConsumer, message);
|
||||||
tx.addDelivery(delivery, this);
|
tx.addDelivery(delivery, this);
|
||||||
|
@ -636,11 +625,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
|
|
||||||
if (settleImmediate) {
|
if (settleImmediate) {
|
||||||
settle(delivery);
|
delivery.settle();
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
sessionSPI.afterIO(new IOCallback() {
|
sessionSPI.afterIO(connectionFlusher);
|
||||||
|
sessionSPI.resetContext(oldContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ConnectionFlushIOCallback implements IOCallback {
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void done() {
|
||||||
connection.flush();
|
connection.flush();
|
||||||
|
@ -650,10 +644,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
public void onError(int errorCode, String errorMessage) {
|
public void onError(int errorCode, String errorMessage) {
|
||||||
connection.flush();
|
connection.flush();
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
sessionSPI.resetContext(oldContext);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void settle(Delivery delivery) {
|
public void settle(Delivery delivery) {
|
||||||
|
@ -681,16 +671,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
|
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
|
||||||
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
|
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
|
||||||
|
|
||||||
// presettle means we can settle the message on the dealer side before we send it, i.e.
|
|
||||||
// for browsers
|
|
||||||
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
|
||||||
|
|
||||||
// we only need a tag if we are going to settle later
|
// we only need a tag if we are going to settle later
|
||||||
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
|
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
|
||||||
|
|
||||||
// Let the Message decide how to present the message bytes
|
// Let the Message decide how to present the message bytes
|
||||||
boolean attemptRelease = true;
|
|
||||||
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
|
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
|
||||||
|
boolean releaseRequired = sendBuffer instanceof NettyReadable;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int size = sendBuffer.remaining();
|
int size = sendBuffer.remaining();
|
||||||
|
@ -713,14 +699,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
delivery.setMessageFormat((int) message.getMessageFormat());
|
delivery.setMessageFormat((int) message.getMessageFormat());
|
||||||
delivery.setContext(messageReference);
|
delivery.setContext(messageReference);
|
||||||
|
|
||||||
if (sendBuffer instanceof NettyReadable) {
|
if (releaseRequired) {
|
||||||
sender.send(sendBuffer);
|
sender.send(sendBuffer);
|
||||||
// Above send copied, so release now if needed
|
// Above send copied, so release now if needed
|
||||||
attemptRelease = false;
|
releaseRequired = false;
|
||||||
((NettyReadable) sendBuffer).getByteBuf().release();
|
((NettyReadable) sendBuffer).getByteBuf().release();
|
||||||
} else {
|
} else {
|
||||||
// Don't have pooled content, no need to release or copy.
|
// Don't have pooled content, no need to release or copy.
|
||||||
attemptRelease = false;
|
|
||||||
sender.sendNoCopy(sendBuffer);
|
sender.sendNoCopy(sendBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -731,6 +716,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
} else {
|
} else {
|
||||||
sender.advance();
|
sender.advance();
|
||||||
}
|
}
|
||||||
|
|
||||||
connection.flush();
|
connection.flush();
|
||||||
} finally {
|
} finally {
|
||||||
connection.unlock();
|
connection.unlock();
|
||||||
|
@ -738,7 +724,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
|
|
||||||
return size;
|
return size;
|
||||||
} finally {
|
} finally {
|
||||||
if (attemptRelease && sendBuffer instanceof NettyReadable) {
|
if (releaseRequired) {
|
||||||
((NettyReadable) sendBuffer).getByteBuf().release();
|
((NettyReadable) sendBuffer).getByteBuf().release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,25 +243,10 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
|
||||||
int capacity = transport.capacity();
|
int capacity = transport.capacity();
|
||||||
|
|
||||||
if (!receivedFirstPacket) {
|
if (!receivedFirstPacket) {
|
||||||
try {
|
handleFirstPacket(buffer);
|
||||||
byte auth = buffer.getByte(4);
|
// there is a chance that if SASL Handshake has been carried out that the capacity may change.
|
||||||
if (auth == SASL || auth == BARE) {
|
|
||||||
if (isServer) {
|
|
||||||
dispatchAuth(auth == SASL);
|
|
||||||
} else if (auth == BARE && clientSASLMechanism == null) {
|
|
||||||
dispatchAuthSuccess();
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* there is a chance that if SASL Handshake has been carried out that the capacity may change.
|
|
||||||
* */
|
|
||||||
capacity = transport.capacity();
|
capacity = transport.capacity();
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
|
||||||
log.warn(e.getMessage(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
receivedFirstPacket = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (capacity > 0) {
|
if (capacity > 0) {
|
||||||
ByteBuffer tail = transport.tail();
|
ByteBuffer tail = transport.tail();
|
||||||
|
@ -537,4 +522,21 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
|
||||||
sasl.client();
|
sasl.client();
|
||||||
sasl.setListener(this);
|
sasl.setListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleFirstPacket(ByteBuf buffer) {
|
||||||
|
try {
|
||||||
|
byte auth = buffer.getByte(4);
|
||||||
|
if (auth == SASL || auth == BARE) {
|
||||||
|
if (isServer) {
|
||||||
|
dispatchAuth(auth == SASL);
|
||||||
|
} else if (auth == BARE && clientSASLMechanism == null) {
|
||||||
|
dispatchAuthSuccess();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
receivedFirstPacket = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,8 +73,6 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
ByteBuffer buffer;
|
ByteBuffer buffer;
|
||||||
MessageImpl msg;
|
MessageImpl msg;
|
||||||
|
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
// Replenish coordinator receiver credit on exhaustion so sender can continue
|
// Replenish coordinator receiver credit on exhaustion so sender can continue
|
||||||
// transaction declare and discahrge operations.
|
// transaction declare and discahrge operations.
|
||||||
if (receiver.getCredit() < amqpLowMark) {
|
if (receiver.getCredit() < amqpLowMark) {
|
||||||
|
@ -96,9 +94,6 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
|
|
||||||
msg = decodeMessage(buffer);
|
msg = decodeMessage(buffer);
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
Object action = ((AmqpValue) msg.getBody()).getValue();
|
Object action = ((AmqpValue) msg.getBody()).getValue();
|
||||||
if (action instanceof Declare) {
|
if (action instanceof Declare) {
|
||||||
|
@ -160,23 +155,13 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
}
|
}
|
||||||
} catch (ActiveMQAMQPException amqpE) {
|
} catch (ActiveMQAMQPException amqpE) {
|
||||||
log.warn(amqpE.getMessage(), amqpE);
|
log.warn(amqpE.getMessage(), amqpE);
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
|
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
connection.flush();
|
connection.flush();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn(e.getMessage(), e);
|
log.warn(e.getMessage(), e);
|
||||||
connection.lock();
|
|
||||||
try {
|
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
|
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
|
||||||
} finally {
|
|
||||||
connection.unlock();
|
|
||||||
}
|
|
||||||
connection.flush();
|
connection.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,17 +49,19 @@ public class ProtonServerReceiverContextTest {
|
||||||
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
|
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
|
||||||
|
|
||||||
Delivery mockDelivery = mock(Delivery.class);
|
Delivery mockDelivery = mock(Delivery.class);
|
||||||
when(mockDelivery.isReadable()).thenReturn(true);
|
|
||||||
when(mockDelivery.isAborted()).thenReturn(true);
|
when(mockDelivery.isAborted()).thenReturn(true);
|
||||||
when(mockDelivery.isPartial()).thenReturn(true);
|
when(mockDelivery.isPartial()).thenReturn(true);
|
||||||
when(mockDelivery.getLink()).thenReturn(mockReceiver);
|
when(mockDelivery.getLink()).thenReturn(mockReceiver);
|
||||||
|
|
||||||
|
when(mockReceiver.current()).thenReturn(mockDelivery);
|
||||||
|
|
||||||
if (drain) {
|
if (drain) {
|
||||||
when(mockReceiver.getDrain()).thenReturn(true);
|
when(mockReceiver.getDrain()).thenReturn(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.onMessage(mockDelivery);
|
rc.onMessage(mockDelivery);
|
||||||
|
|
||||||
|
verify(mockReceiver, times(1)).current();
|
||||||
verify(mockReceiver, times(1)).advance();
|
verify(mockReceiver, times(1)).advance();
|
||||||
verify(mockDelivery, times(1)).settle();
|
verify(mockDelivery, times(1)).settle();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue