ARTEMIS-1038 Make usage of Delivery.available and upgrade proton
This commit is contained in:
parent
9e6c40a8de
commit
ae34b01065
|
@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||||
|
@ -28,7 +26,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
|
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||||
|
@ -134,7 +131,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
||||||
Receiver receiver;
|
Receiver receiver;
|
||||||
ByteBuf buffer = null;
|
|
||||||
try {
|
try {
|
||||||
receiver = ((Receiver) delivery.getLink());
|
receiver = ((Receiver) delivery.getLink());
|
||||||
|
|
||||||
|
@ -145,20 +141,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
if (delivery.isPartial()) {
|
if (delivery.isPartial()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// This should be used if getDataLength was avilable
|
|
||||||
// byte[] data = new byte[delivery.getDataLength()];
|
|
||||||
|
|
||||||
buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
|
|
||||||
Transaction tx = null;
|
Transaction tx = null;
|
||||||
|
|
||||||
|
byte[] data;
|
||||||
|
|
||||||
synchronized (connection.getLock()) {
|
synchronized (connection.getLock()) {
|
||||||
DeliveryUtil.readDelivery(receiver, buffer);
|
data = new byte[delivery.available()];
|
||||||
|
receiver.recv(data, 0, data.length);
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] data = new byte[buffer.writerIndex()];
|
|
||||||
buffer.readBytes(data);
|
|
||||||
|
|
||||||
if (delivery.getRemoteState() instanceof TransactionalState) {
|
if (delivery.getRemoteState() instanceof TransactionalState) {
|
||||||
|
|
||||||
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
||||||
|
@ -179,10 +172,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
rejected.setError(condition);
|
rejected.setError(condition);
|
||||||
delivery.disposition(rejected);
|
delivery.disposition(rejected);
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
} finally {
|
|
||||||
if (buffer != null) {
|
|
||||||
buffer.release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -83,7 +83,7 @@
|
||||||
<jgroups.version>3.6.9.Final</jgroups.version>
|
<jgroups.version>3.6.9.Final</jgroups.version>
|
||||||
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
|
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
|
||||||
<netty.version>4.1.6.Final</netty.version>
|
<netty.version>4.1.6.Final</netty.version>
|
||||||
<proton.version>0.16.0</proton.version>
|
<proton.version>0.18.0</proton.version>
|
||||||
<resteasy.version>3.0.19.Final</resteasy.version>
|
<resteasy.version>3.0.19.Final</resteasy.version>
|
||||||
<slf4j.version>1.7.21</slf4j.version>
|
<slf4j.version>1.7.21</slf4j.version>
|
||||||
<qpid.jms.version>0.20.0</qpid.jms.version>
|
<qpid.jms.version>0.20.0</qpid.jms.version>
|
||||||
|
|
|
@ -57,6 +57,11 @@ public class UnmodifiableDelivery implements Delivery {
|
||||||
return delivery.getDataLength();
|
return delivery.getDataLength();
|
||||||
} */
|
} */
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int available() {
|
||||||
|
return delivery.available();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeliveryState getLocalState() {
|
public DeliveryState getLocalState() {
|
||||||
return delivery.getLocalState();
|
return delivery.getLocalState();
|
||||||
|
|
Loading…
Reference in New Issue