ARTEMIS-722 Add DELAYED_DELIVERY capability to server connection open

The server should indicate to clients that it supports the message
annotation that allows message delivery to be delayed
'x-opt-delivery-time'
This commit is contained in:
Timothy Bish 2016-09-07 16:24:52 -04:00 committed by Clebert Suconic
parent 795ddfc1f2
commit 42ff4a6048
5 changed files with 144 additions and 6 deletions

View File

@ -16,6 +16,8 @@
*/
package org.proton.plug;
import org.apache.qpid.proton.amqp.Symbol;
import io.netty.buffer.ByteBuf;
public interface AMQPConnectionContext {
@ -30,6 +32,14 @@ public interface AMQPConnectionContext {
SASLResult getSASLResult();
/**
* Load and return a <code>[]Symbol</code> that contains the connection capabilities
* offered to new connections
*
* @return the capabilities that are offered to new remote peers on connect.
*/
Symbol[] getConnectionCapabilitiesOffered();
/**
* Even though we are currently always sending packets asynchronsouly
* we have a possibility to start trusting on the network flow control

View File

@ -46,6 +46,7 @@ public class AmqpSupport {
// Symbols used to announce connection information to remote peer.
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");

View File

@ -16,6 +16,9 @@
*/
package org.proton.plug.context;
import static org.proton.plug.AmqpSupport.PRODUCT;
import static org.proton.plug.AmqpSupport.VERSION;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -76,11 +79,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
ScheduledExecutorService scheduledPool) {
this.connectionCallback = connectionCallback;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
connectionProperties.put(Symbol.valueOf("product"), "apache-activemq-artemis");
connectionProperties.put(Symbol.valueOf("version"), VersionLoader.getVersion().getFullVersion());
connectionProperties.put(PRODUCT, "apache-activemq-artemis");
connectionProperties.put(VERSION, VersionLoader.getVersion().getFullVersion());
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
this.handler = ProtonHandler.Factory.create(dispatchExecutor);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
if (idleTimeout > 0) {
@ -211,6 +216,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
connection.setContext(AbstractConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
initialise();
@ -326,9 +332,10 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
System.err.println("Handler is null, can't delivery " + delivery);
}
}
}
@Override
public Symbol[] getConnectionCapabilitiesOffered() {
return null;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.proton.plug.context.server;
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
@ -79,4 +82,7 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
}
}
public Symbol[] getConnectionCapabilitiesOffered() {
return new Symbol[]{DELAYED_DELIVERY};
}
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.proton;
import static org.proton.plug.AmqpSupport.contains;
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
import static org.proton.plug.AmqpSupport.PRODUCT;
import static org.proton.plug.AmqpSupport.VERSION;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -63,6 +68,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@ -210,6 +216,114 @@ public class ProtonTest extends ProtonTestBase {
}
}
@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
Symbol[] offered = connection.getRemoteOfferedCapabilities();
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}
Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}
if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
}
});
AmqpConnection connection = client.connect();
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
}
finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(5, TimeUnit.SECONDS));
}
finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
}
finally {
connection.close();
}
}
@Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol