diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 0527211220..6a0cbf738e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -26,7 +26,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.jms.InvalidSelectorException; @@ -72,7 +74,7 @@ public class AmqpSession implements AmqpResource { private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class); - private final Map consumers = new HashMap(); + private final Map consumers = new HashMap<>(); private final AmqpConnection connection; private final Session protonSession; @@ -190,7 +192,7 @@ public class AmqpSession implements AmqpResource { if (target.getDynamic()) { destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities()); - Map dynamicNodeProperties = new HashMap(); + Map dynamicNodeProperties = new HashMap<>(); dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); // Currently we only support temporary destinations with delete on close lifetime policy. @@ -218,6 +220,14 @@ public class AmqpSession implements AmqpResource { } } + Symbol[] remoteDesiredCapabilities = protonReceiver.getRemoteDesiredCapabilities(); + if (remoteDesiredCapabilities != null) { + List list = Arrays.asList(remoteDesiredCapabilities); + if (list.contains(AmqpSupport.DELAYED_DELIVERY)) { + protonReceiver.setOfferedCapabilities(new Symbol[] { AmqpSupport.DELAYED_DELIVERY }); + } + } + receiver.setDestination(destination); connection.sendToActiveMQ(producerInfo, new ResponseHandler() { @Override @@ -255,7 +265,7 @@ public class AmqpSession implements AmqpResource { LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName()); try { - final Map supportedFilters = new HashMap(); + final Map supportedFilters = new HashMap<>(); protonSender.setContext(sender); boolean noLocal = false; @@ -311,7 +321,7 @@ public class AmqpSession implements AmqpResource { } else if (source.getDynamic()) { destination = connection.createTemporaryDestination(protonSender, source.getCapabilities()); - Map dynamicNodeProperties = new HashMap(); + Map dynamicNodeProperties = new HashMap<>(); dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); // Currently we only support temporary destinations with delete on close lifetime policy. diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 3850ec7082..8d28688f18 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -70,9 +71,14 @@ public class AmqpSender extends AmqpAbstractResource { private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; - private final Set pending = new LinkedHashSet(); + private final Set pending = new LinkedHashSet<>(); private byte[] encodeBuffer = new byte[1024 * 8]; + private Symbol[] desiredCapabilities; + private Symbol[] offeredCapabilities; + private Map properties; + + /** * Create a new sender instance. * @@ -245,6 +251,30 @@ public class AmqpSender extends AmqpAbstractResource { this.sendTimeout = sendTimeout; } + public void setDesiredCapabilities(Symbol[] desiredCapabilities) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.desiredCapabilities = desiredCapabilities; + } + + public void setOfferedCapabilities(Symbol[] offeredCapabilities) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.offeredCapabilities = offeredCapabilities; + } + + public void setProperties(Map properties) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.properties = properties; + } + //----- Private Sender implementation ------------------------------------// private void checkClosed() { @@ -279,6 +309,10 @@ public class AmqpSender extends AmqpAbstractResource { } sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); + setEndpoint(sender); super.doOpen(); @@ -408,7 +442,7 @@ public class AmqpSender extends AmqpAbstractResource { @Override public void processDeliveryUpdates(AmqpConnection connection) throws IOException { - List toRemove = new ArrayList(); + List toRemove = new ArrayList<>(); for (Delivery delivery : pending) { DeliveryState state = delivery.getRemoteState(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index b239daed1c..88cba94369 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.engine.Connection; @@ -87,7 +89,7 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the sender. */ public AmqpSender createSender() throws Exception { - return createSender(null, false); + return createSender(null, false, null, null, null); } /** @@ -101,7 +103,39 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the sender. */ public AmqpSender createSender(final String address) throws Exception { - return createSender(address, false); + return createSender(address, false, null, null, null); + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param desiredCapabilities + * the capabilities that the caller wants the remote to support. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, Symbol[] desiredCapabilities) throws Exception { + return createSender(address, false, desiredCapabilities, null, null); + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param presettle + * controls if the created sender produces message that have already been marked settled. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, boolean presettle) throws Exception { + return createSender(address, presettle, null, null, null); } /** @@ -111,16 +145,26 @@ public class AmqpSession extends AmqpAbstractResource { * the address to which the sender will produce its messages. * @param presettle * controls if the created sender produces message that have already been marked settled. + * @param desiredCapabilities + * the capabilities that the caller wants the remote to support. + * @param offeredCapabilities + * the capabilities that the caller wants the advertise support for. + * @param properties + * the properties to send as part of the sender open. * * @return a newly created sender that is ready for use. * * @throws Exception if an error occurs while creating the sender. */ - public AmqpSender createSender(final String address, boolean presettle) throws Exception { + public AmqpSender createSender(final String address, boolean presettle, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map properties) throws Exception { checkClosed(); final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId()); sender.setPresettle(presettle); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); + final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { @@ -166,9 +210,35 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpSender createSender(Target target, String senderId) throws Exception { + return createSender(target, senderId, null, null, null); + } + + /** + * Create a sender instance using the given Target + * + * @param target + * the caller created and configured Target used to create the sender link. + * @param sender + * the sender ID to assign to the newly created Sender. + * @param desiredCapabilities + * the capabilities that the caller wants the remote to support. + * @param offeredCapabilities + * the capabilities that the caller wants the advertise support for. + * @param properties + * the properties to send as part of the sender open. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target, String senderId, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map properties) throws Exception { checkClosed(); final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); + final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java index 9aca5b8405..8ffcfbd11f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java @@ -39,6 +39,7 @@ public class AmqpSupport { // Symbols used for connection capabilities public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY"); // Symbols used to announce connection error information public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index 2f7ff8e9da..9b17d8620f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -254,6 +254,7 @@ public class UnmodifiableLink implements Link { return link.detached(); } + @Override public Record attachments() { return link.attachments(); } @@ -272,4 +273,34 @@ public class UnmodifiableLink implements Link { public Map getRemoteProperties() { return link.getRemoteProperties(); } + + @Override + public Symbol[] getDesiredCapabilities() { + return link.getDesiredCapabilities(); + } + + @Override + public Symbol[] getOfferedCapabilities() { + return link.getOfferedCapabilities(); + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() { + return link.getRemoteDesiredCapabilities(); + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() { + return link.getRemoteOfferedCapabilities(); + } + + @Override + public void setDesiredCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setOfferedCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java index 1b07ed07f4..121a7f5eee 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.client.util; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Receiver; /** @@ -56,4 +57,9 @@ public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver { public void setDrain(boolean drain) { throw new UnsupportedOperationException("Cannot alter the Link state"); } + + @Override + public int recv(WritableBuffer buffer) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java index 1517a93635..fbe577276d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.client.util; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Sender; /** @@ -42,4 +43,9 @@ public class UnmodifiableSender extends UnmodifiableLink implements Sender { public void abort() { throw new UnsupportedOperationException("Cannot alter the Link state"); } + + @Override + public int send(ReadableBuffer buffer) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java index 31b045208a..055bb51a06 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java @@ -17,7 +17,9 @@ package org.apache.activemq.transport.amqp.client.util; import java.util.EnumSet; +import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; @@ -147,4 +149,49 @@ public class UnmodifiableSession implements Session { public void setOutgoingWindow(long outgoingWindowSize) { throw new UnsupportedOperationException("Cannot alter the Session"); } + + @Override + public Symbol[] getDesiredCapabilities() { + return session.getDesiredCapabilities(); + } + + @Override + public Symbol[] getOfferedCapabilities() { + return session.getOfferedCapabilities(); + } + + @Override + public Map getProperties() { + return session.getProperties(); + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() { + return session.getRemoteDesiredCapabilities(); + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() { + return session.getRemoteOfferedCapabilities(); + } + + @Override + public Map getRemoteProperties() { + return session.getRemoteProperties(); + } + + @Override + public void setDesiredCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setOfferedCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setProperties(Map capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java index 3c3b75d499..da9e011246 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.interop; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -31,8 +33,10 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; import org.junit.Test; @@ -206,4 +210,38 @@ public class AmqpSenderTest extends AmqpClientTestSupport { sender.close(); connection.close(); } + + @Test + public void testDeliveryDelayOfferedWhenRequested() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Sender sender) { + + Symbol[] offered = sender.getRemoteOfferedCapabilities(); + if (!contains(offered, AmqpSupport.DELAYED_DELIVERY)) { + markAsInvalid("Broker did not indicate it support delayed message delivery"); + } + } + }); + + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerView.getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] { AmqpSupport.DELAYED_DELIVERY }); + assertNotNull(sender); + + assertEquals(1, brokerView.getQueues().length); + + connection.getStateInspector().assertValid(); + + sender.close(); + connection.close(); + } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index c4479c081c..37c913c5dd 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ 1.1.2 1.4.0 3.4.6 - 0.15.0 + 0.16.0 0.11.1 4.0.41.Final 0.14.0