mirror of https://github.com/apache/activemq.git
Use constant symbols that are now available in v0.9+ of proton-j
This commit is contained in:
parent
4b4cf7c09e
commit
6a2ffca57e
|
@ -38,6 +38,7 @@ import org.apache.qpid.proton.amqp.DescribedType;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Released;
|
import org.apache.qpid.proton.amqp.messaging.Released;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Target;
|
import org.apache.qpid.proton.amqp.messaging.Target;
|
||||||
|
@ -58,14 +59,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
|
||||||
|
|
||||||
// TODO: Use constants available from Proton 0.9
|
|
||||||
private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list");
|
|
||||||
private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list");
|
|
||||||
private static final Symbol MODIFIED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:modified:list");
|
|
||||||
private static final Symbol RELEASED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:released:list");
|
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean();
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
|
|
||||||
private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>();
|
private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>();
|
||||||
|
|
||||||
private final AmqpSession session;
|
private final AmqpSession session;
|
||||||
|
@ -534,8 +528,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
|
|
||||||
protected void configureSource(Source source) {
|
protected void configureSource(Source source) {
|
||||||
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
|
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
|
||||||
Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL,
|
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
|
||||||
RELEASED_DESCRIPTOR_SYMBOL, MODIFIED_DESCRIPTOR_SYMBOL};
|
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
|
||||||
|
|
||||||
if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
|
if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
|
||||||
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
|
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
|
||||||
|
|
|
@ -52,9 +52,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
|
||||||
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
|
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
|
||||||
//TODO: Use constants available from Proton 0.9
|
|
||||||
private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list");
|
|
||||||
private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list");
|
|
||||||
|
|
||||||
public static final long DEFAULT_SEND_TIMEOUT = 15000;
|
public static final long DEFAULT_SEND_TIMEOUT = 15000;
|
||||||
|
|
||||||
|
@ -240,7 +237,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
@Override
|
@Override
|
||||||
protected void doOpen() {
|
protected void doOpen() {
|
||||||
|
|
||||||
Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL};
|
Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL };
|
||||||
Source source = new Source();
|
Source source = new Source();
|
||||||
source.setAddress(senderId);
|
source.setAddress(senderId);
|
||||||
source.setOutcomes(outcomes);
|
source.setOutcomes(outcomes);
|
||||||
|
|
Loading…
Reference in New Issue