Advertise that the connection supported delayed message delivery.
(cherry picked from commit 4a1f2f7ca7)
This commit is contained in:
Timothy Bish 2016-08-30 15:00:23 -04:00
parent 26f665b822
commit f15c0e8e0a
3 changed files with 8 additions and 1 deletions

View File

@ -51,6 +51,7 @@ public class AmqpSupport {
// Symbols used to announce connection information to remote peer. // Symbols used to announce connection information to remote peer.
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID; import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID;
import static org.apache.activemq.transport.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD; import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD;
import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM; import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM;
import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT; import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT;
@ -186,7 +187,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
* @return the capabilities that are offered to new clients on connect. * @return the capabilities that are offered to new clients on connect.
*/ */
protected Symbol[] getConnectionCapabilitiesOffered() { protected Symbol[] getConnectionCapabilitiesOffered() {
return new Symbol[]{ ANONYMOUS_RELAY }; return new Symbol[]{ ANONYMOUS_RELAY, DELAYED_DELIVERY };
} }
/** /**

View File

@ -52,6 +52,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
private static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
@Parameters(name="{0}") @Parameters(name="{0}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
@ -97,6 +98,10 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
markAsInvalid("Broker did not indicate it support anonymous relay"); markAsInvalid("Broker did not indicate it support anonymous relay");
} }
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
}
Map<Symbol, Object> properties = connection.getRemoteProperties(); Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(QUEUE_PREFIX)) { if (!properties.containsKey(QUEUE_PREFIX)) {
markAsInvalid("Broker did not send a queue prefix value"); markAsInvalid("Broker did not send a queue prefix value");