From 82795b7bff64da8770a0c53b9eb1373b2d63fbf6 Mon Sep 17 00:00:00 2001 From: andytaylor Date: Wed, 12 Sep 2018 11:16:41 +0100 Subject: [PATCH] ARTEMIS-1898 - make sure tosend credits on rejected messages And also to run the credit runnables once memory is free in fail mode https://issues.apache.org/jira/browse/ARTEMIS-1898 --- .../amqp/broker/AMQPSessionCallback.java | 47 +++++++--- .../proton/ProtonServerReceiverContext.java | 1 + .../core/paging/impl/PagingStoreImpl.java | 5 +- .../amqp/AmqpFlowControlFailTest.java | 94 +++++++++++++++++++ 4 files changed, 133 insertions(+), 14 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6b163ae8ca..7fef3dbc68 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -109,6 +109,8 @@ public class AMQPSessionCallback implements SessionCallback { private final AddressQueryCache addressQueryCache = new AddressQueryCache<>(); + private CreditRunnable creditRunnable; + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -580,20 +582,37 @@ public class AMQPSessionCallback implements SessionCallback { final int threshold, final Receiver receiver) { try { + /* + * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the + * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this + * may cause a memory leak, one is enough. + * */ + if (creditRunnable != null && !creditRunnable.isRun()) + return; PagingManager pagingManager = manager.getServer().getPagingManager(); - Runnable creditRunnable = () -> { - connection.lock(); - try { - if (receiver.getCredit() <= threshold) { - int topUp = credits - receiver.getCredit(); - if (topUp > 0) { - receiver.flow(topUp); - } - } - } finally { - connection.unlock(); + creditRunnable = new CreditRunnable() { + boolean isRun = false; + @Override + public boolean isRun() { + return isRun; + } + + @Override + public void run() { + connection.lock(); + try { + if (receiver.getCredit() <= threshold) { + int topUp = credits - receiver.getCredit(); + if (topUp > 0) { + receiver.flow(topUp); + } + } + } finally { + isRun = true; + connection.unlock(); + } + connection.flush(); } - connection.flush(); }; if (address == null) { @@ -772,5 +791,7 @@ public class AMQPSessionCallback implements SessionCallback { } } - + interface CreditRunnable extends Runnable { + boolean isRun(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index cdd1362d43..0f0e9d5070 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -262,6 +262,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements delivery.disposition(rejected); delivery.settle(); + flow(amqpCredits, minCreditRefresh); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5f0d3c8f94..1392cfc986 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -658,6 +658,9 @@ public class PagingStoreImpl implements PagingStore { if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (isFull()) { + if (runWhenAvailable != null) { + onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + } return false; } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { @@ -704,7 +707,7 @@ public class PagingStoreImpl implements PagingStore { ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString()); } - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { if (usingGlobalMaxSize && !globalFull || maxSize != -1) { checkReleaseMemory(globalFull, newSize); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java new file mode 100644 index 0000000000..2f65dfb20d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +import java.io.IOException; + +public class AmqpFlowControlFailTest extends JMSClientTestSupport { + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + // For BLOCK tests + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(1000); + // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + } + + @Test(timeout = 60000) + public void testMesagesNotSent() throws Exception { + AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); + AmqpConnection connection = addConnection(client.connect()); + int messagesSent = 0; + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + boolean rejected = false; + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[10]; + message.setBytes(payload); + try { + sender.send(message); + messagesSent++; + System.out.println("message = " + message); + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + rejected = false; + assertEquals(0, sender.getSender().getCredit()); + AmqpSession session2 = connection.createSession(); + AmqpReceiver receiver = session2.createReceiver(getQueueName()); + receiver.flow(messagesSent); + for (int i = 0; i < messagesSent; i++) { + AmqpMessage receive = receiver.receive(); + receive.accept(); + } + receiver.close(); + session2.close(); + assertEquals(1000, sender.getSender().getCredit()); + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[100]; + message.setBytes(payload); + try { + sender.send(message); + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + assertEquals(0, sender.getSender().getCredit()); + } finally { + connection.close(); + } + } +}