Add support for the receiver side of a sender link which carries a
desired capabilities array which can request to know if we support
delayed delivery, answer with an offered capability in that case.
(cherry picked from commit 4a1c05b628)
This commit is contained in:
Timothy Bish 2016-12-13 12:36:52 -05:00
parent 0225edb471
commit 31a79ea584
10 changed files with 253 additions and 10 deletions

View File

@ -26,7 +26,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.InvalidSelectorException;
@ -72,7 +74,7 @@ public class AmqpSession implements AmqpResource {
private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
private final Map<ConsumerId, AmqpSender> consumers = new HashMap<>();
private final AmqpConnection connection;
private final Session protonSession;
@ -190,7 +192,7 @@ public class AmqpSession implements AmqpResource {
if (target.getDynamic()) {
destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
// Currently we only support temporary destinations with delete on close lifetime policy.
@ -218,6 +220,14 @@ public class AmqpSession implements AmqpResource {
}
}
Symbol[] remoteDesiredCapabilities = protonReceiver.getRemoteDesiredCapabilities();
if (remoteDesiredCapabilities != null) {
List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);
if (list.contains(AmqpSupport.DELAYED_DELIVERY)) {
protonReceiver.setOfferedCapabilities(new Symbol[] { AmqpSupport.DELAYED_DELIVERY });
}
}
receiver.setDestination(destination);
connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
@ -255,7 +265,7 @@ public class AmqpSession implements AmqpResource {
LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());
try {
final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
final Map<Symbol, Object> supportedFilters = new HashMap<>();
protonSender.setContext(sender);
boolean noLocal = false;
@ -311,7 +321,7 @@ public class AmqpSession implements AmqpResource {
} else if (source.getDynamic()) {
destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
// Currently we only support temporary destinations with delete on close lifetime policy.

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -70,9 +71,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private boolean presettle;
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
private final Set<Delivery> pending = new LinkedHashSet<>();
private byte[] encodeBuffer = new byte[1024 * 8];
private Symbol[] desiredCapabilities;
private Symbol[] offeredCapabilities;
private Map<Symbol, Object> properties;
/**
* Create a new sender instance.
*
@ -245,6 +251,30 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.sendTimeout = sendTimeout;
}
public void setDesiredCapabilities(Symbol[] desiredCapabilities) {
if (getEndpoint() != null) {
throw new IllegalStateException("Endpoint already established");
}
this.desiredCapabilities = desiredCapabilities;
}
public void setOfferedCapabilities(Symbol[] offeredCapabilities) {
if (getEndpoint() != null) {
throw new IllegalStateException("Endpoint already established");
}
this.offeredCapabilities = offeredCapabilities;
}
public void setProperties(Map<Symbol, Object> properties) {
if (getEndpoint() != null) {
throw new IllegalStateException("Endpoint already established");
}
this.properties = properties;
}
//----- Private Sender implementation ------------------------------------//
private void checkClosed() {
@ -279,6 +309,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
sender.setDesiredCapabilities(desiredCapabilities);
sender.setOfferedCapabilities(offeredCapabilities);
sender.setProperties(properties);
setEndpoint(sender);
super.doOpen();
@ -408,7 +442,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
List<Delivery> toRemove = new ArrayList<Delivery>();
List<Delivery> toRemove = new ArrayList<>();
for (Delivery delivery : pending) {
DeliveryState state = delivery.getRemoteState();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.engine.Connection;
@ -87,7 +89,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender() throws Exception {
return createSender(null, false);
return createSender(null, false, null, null, null);
}
/**
@ -101,7 +103,39 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address) throws Exception {
return createSender(address, false);
return createSender(address, false, null, null, null);
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param desiredCapabilities
* the capabilities that the caller wants the remote to support.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, Symbol[] desiredCapabilities) throws Exception {
return createSender(address, false, desiredCapabilities, null, null);
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param presettle
* controls if the created sender produces message that have already been marked settled.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, boolean presettle) throws Exception {
return createSender(address, presettle, null, null, null);
}
/**
@ -111,16 +145,26 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* the address to which the sender will produce its messages.
* @param presettle
* controls if the created sender produces message that have already been marked settled.
* @param desiredCapabilities
* the capabilities that the caller wants the remote to support.
* @param offeredCapabilities
* the capabilities that the caller wants the advertise support for.
* @param properties
* the properties to send as part of the sender open.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, boolean presettle) throws Exception {
public AmqpSender createSender(final String address, boolean presettle, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map<Symbol, Object> properties) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId());
sender.setPresettle(presettle);
sender.setDesiredCapabilities(desiredCapabilities);
sender.setOfferedCapabilities(offeredCapabilities);
sender.setProperties(properties);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@ -166,9 +210,35 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target, String senderId) throws Exception {
return createSender(target, senderId, null, null, null);
}
/**
* Create a sender instance using the given Target
*
* @param target
* the caller created and configured Target used to create the sender link.
* @param sender
* the sender ID to assign to the newly created Sender.
* @param desiredCapabilities
* the capabilities that the caller wants the remote to support.
* @param offeredCapabilities
* the capabilities that the caller wants the advertise support for.
* @param properties
* the properties to send as part of the sender open.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target, String senderId, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map<Symbol, Object> properties) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
sender.setDesiredCapabilities(desiredCapabilities);
sender.setOfferedCapabilities(offeredCapabilities);
sender.setProperties(properties);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {

View File

@ -39,6 +39,7 @@ public class AmqpSupport {
// Symbols used for connection capabilities
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
// Symbols used to announce connection error information
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");

View File

@ -254,6 +254,7 @@ public class UnmodifiableLink implements Link {
return link.detached();
}
@Override
public Record attachments() {
return link.attachments();
}
@ -272,4 +273,34 @@ public class UnmodifiableLink implements Link {
public Map<Symbol, Object> getRemoteProperties() {
return link.getRemoteProperties();
}
@Override
public Symbol[] getDesiredCapabilities() {
return link.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return link.getOfferedCapabilities();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return link.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return link.getRemoteOfferedCapabilities();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Receiver;
/**
@ -56,4 +57,9 @@ public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver {
public void setDrain(boolean drain) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int recv(WritableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.client.util;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Sender;
/**
@ -42,4 +43,9 @@ public class UnmodifiableSender extends UnmodifiableLink implements Sender {
public void abort() {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public int send(ReadableBuffer buffer) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -17,7 +17,9 @@
package org.apache.activemq.transport.amqp.client.util;
import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
@ -147,4 +149,49 @@ public class UnmodifiableSession implements Session {
public void setOutgoingWindow(long outgoingWindowSize) {
throw new UnsupportedOperationException("Cannot alter the Session");
}
@Override
public Symbol[] getDesiredCapabilities() {
return session.getDesiredCapabilities();
}
@Override
public Symbol[] getOfferedCapabilities() {
return session.getOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getProperties() {
return session.getProperties();
}
@Override
public Symbol[] getRemoteDesiredCapabilities() {
return session.getRemoteDesiredCapabilities();
}
@Override
public Symbol[] getRemoteOfferedCapabilities() {
return session.getRemoteOfferedCapabilities();
}
@Override
public Map<Symbol, Object> getRemoteProperties() {
return session.getRemoteProperties();
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
@Override
public void setProperties(Map<Symbol, Object> capabilities) {
throw new UnsupportedOperationException("Cannot alter the Link state");
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.interop;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -31,8 +33,10 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
@ -206,4 +210,38 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
@Test
public void testDeliveryDelayOfferedWhenRequested() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Sender sender) {
Symbol[] offered = sender.getRemoteOfferedCapabilities();
if (!contains(offered, AmqpSupport.DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
}
}
});
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerView.getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] { AmqpSupport.DELAYED_DELIVERY });
assertNotNull(sender);
assertEquals(1, brokerView.getQueues().length);
connection.getStateInspector().assertValid();
sender.close();
connection.close();
}
}

View File

@ -101,7 +101,7 @@
<p2psockets-version>1.1.2</p2psockets-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.15.0</qpid-proton-version>
<qpid-proton-version>0.16.0</qpid-proton-version>
<qpid-jms-version>0.11.1</qpid-jms-version>
<qpid-jms-netty-version>4.0.41.Final</qpid-jms-netty-version>
<qpid-jms-proton-version>0.14.0</qpid-jms-proton-version>