This commit is contained in:
Clebert Suconic 2018-09-13 16:22:29 -04:00
commit 86c4d1b85b
4 changed files with 133 additions and 14 deletions

View File

@ -109,6 +109,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>(); private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
private CreditRunnable creditRunnable;
public AMQPSessionCallback(AMQPConnectionCallback protonSPI, public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager, ProtonProtocolManager manager,
AMQPConnectionContext connection, AMQPConnectionContext connection,
@ -580,8 +582,23 @@ public class AMQPSessionCallback implements SessionCallback {
final int threshold, final int threshold,
final Receiver receiver) { final Receiver receiver) {
try { try {
/*
* The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
* runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
* may cause a memory leak, one is enough.
* */
if (creditRunnable != null && !creditRunnable.isRun())
return;
PagingManager pagingManager = manager.getServer().getPagingManager(); PagingManager pagingManager = manager.getServer().getPagingManager();
Runnable creditRunnable = () -> { creditRunnable = new CreditRunnable() {
boolean isRun = false;
@Override
public boolean isRun() {
return isRun;
}
@Override
public void run() {
connection.lock(); connection.lock();
try { try {
if (receiver.getCredit() <= threshold) { if (receiver.getCredit() <= threshold) {
@ -591,9 +608,11 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
} finally { } finally {
isRun = true;
connection.unlock(); connection.unlock();
} }
connection.flush(); connection.flush();
}
}; };
if (address == null) { if (address == null) {
@ -772,5 +791,7 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
interface CreditRunnable extends Runnable {
boolean isRun();
}
} }

View File

@ -262,6 +262,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
delivery.disposition(rejected); delivery.disposition(rejected);
delivery.settle(); delivery.settle();
flow(amqpCredits, minCreditRefresh);
} }
} }

View File

@ -658,6 +658,9 @@ public class PagingStoreImpl implements PagingStore {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) { if (isFull()) {
if (runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
}
return false; return false;
} }
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
@ -704,7 +707,7 @@ public class PagingStoreImpl implements PagingStore {
ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString()); ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
} }
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (usingGlobalMaxSize && !globalFull || maxSize != -1) { if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
checkReleaseMemory(globalFull, newSize); checkReleaseMemory(globalFull, newSize);
} }

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
import java.io.IOException;
public class AmqpFlowControlFailTest extends JMSClientTestSupport {
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
addressSettings.setMaxSizeBytes(1000);
// addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
@Test(timeout = 60000)
public void testMesagesNotSent() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
int messagesSent = 0;
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
boolean rejected = false;
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[10];
message.setBytes(payload);
try {
sender.send(message);
messagesSent++;
System.out.println("message = " + message);
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
rejected = false;
assertEquals(0, sender.getSender().getCredit());
AmqpSession session2 = connection.createSession();
AmqpReceiver receiver = session2.createReceiver(getQueueName());
receiver.flow(messagesSent);
for (int i = 0; i < messagesSent; i++) {
AmqpMessage receive = receiver.receive();
receive.accept();
}
receiver.close();
session2.close();
assertEquals(1000, sender.getSender().getCredit());
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];
message.setBytes(payload);
try {
sender.send(message);
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
assertEquals(0, sender.getSender().getCredit());
} finally {
connection.close();
}
}
}