diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml
index 5f91a51fdd..e383b3740a 100644
--- a/artemis-protocols/artemis-amqp-protocol/pom.xml
+++ b/artemis-protocols/artemis-amqp-protocol/pom.xml
@@ -123,5 +123,10 @@
org.osgi
osgi.cmpn
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index aad89a8d0c..c3df1a73d0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -218,6 +218,23 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (!delivery.isReadable()) {
return;
}
+
+ if (delivery.isAborted()) {
+ receiver = ((Receiver) delivery.getLink());
+
+ // Aborting implicitly remotely settles, so advance
+ // receiver to the next delivery and settle locally.
+ receiver.advance();
+ delivery.settle();
+
+ // Replenish the credit if not doing a drain
+ if (!receiver.getDrain()) {
+ receiver.flow(1);
+ }
+
+ return;
+ }
+
if (delivery.isPartial()) {
return;
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
new file mode 100644
index 0000000000..88dfe3a9d3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Test;
+
+public class ProtonServerReceiverContextTest {
+
+ @Test
+ public void testOnMessageWithAbortedDelivery() throws Exception {
+ doOnMessageWithAbortedDeliveryTestImpl(false);
+ }
+
+ @Test
+ public void testOnMessageWithAbortedDeliveryDrain() throws Exception {
+ doOnMessageWithAbortedDeliveryTestImpl(true);
+ }
+
+ private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
+ Receiver mockReceiver = mock(Receiver.class);
+ AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
+
+ when(mockConnContext.getAmqpCredits()).thenReturn(100);
+ when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+ ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
+
+ Delivery mockDelivery = mock(Delivery.class);
+ when(mockDelivery.isReadable()).thenReturn(true);
+ when(mockDelivery.isAborted()).thenReturn(true);
+ when(mockDelivery.isPartial()).thenReturn(true);
+ when(mockDelivery.getLink()).thenReturn(mockReceiver);
+
+ if (drain) {
+ when(mockReceiver.getDrain()).thenReturn(true);
+ }
+
+ rc.onMessage(mockDelivery);
+
+ verify(mockReceiver, times(1)).advance();
+ verify(mockDelivery, times(1)).settle();
+
+ verify(mockReceiver, times(1)).getDrain();
+ if (!drain) {
+ verify(mockReceiver, times(1)).flow(1);
+ }
+ verifyNoMoreInteractions(mockReceiver);
+ }
+
+}