ARTEMIS-1938 Update proton-j to 0.30.0 and Qpid JMS 0.37.0

Update to latest proton-j release and refactor the dispostion code to use
the new type enums to better deal with the dispistions.  Updates to Qpid JMS
0.37.0 which still uses the current netty 4.1.28.Final dependency.
This commit is contained in:
Timothy Bish 2018-11-14 16:31:46 -05:00 committed by Clebert Suconic
parent 9263bb4355
commit 593348b9ad
3 changed files with 59 additions and 35 deletions

View File

@ -56,14 +56,13 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
@ -546,22 +545,45 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Message message = ((MessageReference) delivery.getContext()).getMessage();
DeliveryState remoteState = delivery.getRemoteState();
boolean settleImmediate = true;
if (remoteState instanceof Accepted) {
if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) {
// this can happen in the twice ack mode, that is the receiver accepts and settles separately
// acking again would show an exception but would have no negative effect but best to handle anyway.
if (delivery.isSettled()) {
return;
if (!delivery.isSettled()) {
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
delivery.settle();
}
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof TransactionalState) {
} else {
handleExtendedDeliveryOutcomes(message, delivery, remoteState);
}
if (!preSettle) {
protonSession.replaceTag(delivery.getTag());
}
} finally {
sessionSPI.afterIO(connectionFlusher);
sessionSPI.resetContext(oldContext);
}
}
private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
boolean settleImmediate = true;
boolean handled = true;
if (remoteState == null) {
log.debug("Received null disposition for delivery update: " + remoteState);
return true;
}
switch (remoteState.getType()) {
case Transactional:
// When the message arrives with a TransactionState disposition the ack should
// enlist the message into the transaction associated with the given txn ID.
TransactionalState txState = (TransactionalState) remoteState;
@ -587,19 +609,22 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
}
} else if (remoteState instanceof Released) {
break;
case Released:
try {
sessionSPI.cancel(brokerConsumer, message, false);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Rejected) {
break;
case Rejected:
try {
sessionSPI.reject(brokerConsumer, message);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Modified) {
break;
case Modified:
try {
Modified modification = (Modified) remoteState;
@ -615,23 +640,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}
} else {
break;
default:
log.debug("Received null or unknown disposition for delivery update: " + remoteState);
return;
}
if (!preSettle) {
protonSession.replaceTag(delivery.getTag());
}
if (settleImmediate) {
delivery.settle();
}
} finally {
sessionSPI.afterIO(connectionFlusher);
sessionSPI.resetContext(oldContext);
handled = false;
}
if (settleImmediate) {
delivery.settle();
}
return handled;
}
private final class ConnectionFlushIOCallback implements IOCallback {

View File

@ -85,6 +85,11 @@ public class NettyWritable implements WritableBuffer {
return nettyBuffer.maxCapacity() - nettyBuffer.writerIndex();
}
@Override
public void ensureRemaining(int remaining) {
nettyBuffer.ensureWritable(remaining);
}
@Override
public int position() {
return nettyBuffer.writerIndex();

View File

@ -92,10 +92,10 @@
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.28.Final</netty.version>
<netty-tcnative-version>2.0.12.Final</netty-tcnative-version>
<proton.version>0.29.0</proton.version>
<proton.version>0.30.0</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.36.0</qpid.jms.version>
<qpid.jms.version>0.37.0</qpid.jms.version>
<johnzon.version>0.9.5</johnzon.version>
<json-p.spec.version>1.0-alpha-1</json-p.spec.version>
<javax.inject.version>1</javax.inject.version>