ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK

This commit is contained in:
Martyn Taylor 2016-07-27 13:36:08 +01:00
parent 06fb4a1234
commit 2f721866ab
12 changed files with 185 additions and 41 deletions

View File

@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -56,6 +56,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.proton.plug.sasl.PlainSASLResult;
public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback {
@ -351,18 +352,33 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
recoverContext();
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) {
ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress());
Rejected rejected = new Rejected();
rejected.setError(ec);
delivery.disposition(rejected);
connection.flush();
if (store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx)
if (delivery.remotelySettled()) {
if (serverSession.getCurrentTransaction() != null) {
String amqpAddress = delivery.getLink().getTarget().getAddress();
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
serverSession.getCurrentTransaction().markAsRollbackOnly(e);
}
}
else {
rejectMessage(delivery);
}
}
else {
serverSend(message, delivery, receiver);
}
}
private void rejectMessage(Delivery delivery) {
String address = delivery.getLink().getTarget().getAddress();
ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
Rejected rejected = new Rejected();
rejected.setError(ec);
delivery.disposition(rejected);
connection.flush();
}
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
try {
serverSession.send(message, false);

View File

@ -91,40 +91,49 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
try {
sessionSPI.commitCurrentTX();
}
catch (ActiveMQAMQPException amqpE) {
throw amqpE;
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
}
delivery.settle();
}
}
catch (ActiveMQAMQPException amqpE) {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
}
catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
}
finally {
delivery.settle();
buffer.release();
}
}
private Rejected createRejected(Symbol amqpError, String message) {
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(amqpError);
condition.setDescription(message);
rejected.setError(condition);
return rejected;
}
@Override
public void onFlow(int credits, boolean drain) {
}
@Override
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
//noop
// no op
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
//noop
// no op
}
}

View File

@ -128,6 +128,8 @@ public interface PagingStore extends ActiveMQComponent {
boolean isFull();
boolean isRejectingMessages();
/**
* Write lock the PagingStore.
*

View File

@ -123,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
private volatile AtomicBoolean blocking = new AtomicBoolean(false);
private long rejectThreshold;
public PagingStoreImpl(final SimpleString address,
final ScheduledExecutorService scheduledExecutor,
final long syncTimeout,
@ -187,6 +189,8 @@ public class PagingStoreImpl implements PagingStore {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
if (cursorProvider != null) {
cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
}
@ -1072,6 +1076,14 @@ public class PagingStoreImpl implements PagingStore {
return maxSize > 0 && getAddressSize() > maxSize;
}
@Override
public boolean isRejectingMessages() {
if (addressFullMessagePolicy != AddressFullMessagePolicy.BLOCK) {
return false;
}
return rejectThreshold != AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD && getAddressSize() > rejectThreshold;
}
@Override
public Collection<Integer> getCurrentIds() throws Exception {
List<Integer> ids = new ArrayList<>();

View File

@ -76,6 +76,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final int DEFAULT_QUEUE_PREFETCH = 1000;
// Default address drop threshold, applied to address settings with BLOCK policy. -1 means no threshold enabled.
public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@ -124,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
private Long maxSizeBytesRejectThreshold = null;
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@ -154,6 +159,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
this.managementBrowsePageSize = other.managementBrowsePageSize;
this.queuePrefetch = other.queuePrefetch;
this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
}
public AddressSettings() {
@ -377,6 +383,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public long getMaxSizeBytesRejectThreshold() {
return (maxSizeBytesRejectThreshold == null) ? AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD : maxSizeBytesRejectThreshold;
}
public AddressSettings setMaxSizeBytesRejectThreshold(long maxSizeBytesRejectThreshold) {
this.maxSizeBytesRejectThreshold = maxSizeBytesRejectThreshold;
return this;
}
/**
* merge 2 objects in to 1
*
@ -456,6 +471,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (queuePrefetch == null) {
queuePrefetch = merged.queuePrefetch;
}
if (maxSizeBytesRejectThreshold == null) {
maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold;
}
}
@Override
@ -521,6 +539,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
}
@Override
@ -549,7 +569,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
}
@Override
@ -601,6 +622,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
}
/* (non-Javadoc)
@ -635,6 +658,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode());
return result;
}
@ -802,6 +826,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!queuePrefetch.equals(other.queuePrefetch))
return false;
if (maxSizeBytesRejectThreshold == null) {
if (other.maxSizeBytesRejectThreshold != null)
return false;
}
else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold))
return false;
return true;
}
@ -825,6 +856,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
maxDeliveryAttempts +
", maxSizeBytes=" +
maxSizeBytes +
", maxSizeBytesRejectThreshold=" +
maxSizeBytesRejectThreshold +
", messageCounterHistoryDayLimit=" +
messageCounterHistoryDayLimit +
", pageSizeBytes=" +

View File

@ -2220,7 +2220,15 @@
<xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the maximum size (in bytes) to use in paging for an address (-1 means no limits)
the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit).
</xsd:documentation>
</xsd:annotation>
</xsd:element>

View File

@ -59,6 +59,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
addressSettingsToMerge.setRedeliveryDelay(1003);
addressSettingsToMerge.setPageSizeBytes(1004);
addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
addressSettings.merge(addressSettingsToMerge);
Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
Assert.assertEquals(addressSettings.getExpiryAddress(), exp);
@ -68,6 +70,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003);
Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004);
Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
}
@Test
@ -82,6 +85,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
addressSettingsToMerge.setMaxSizeBytes(1001);
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
addressSettings.merge(addressSettingsToMerge);
AddressSettings addressSettingsToMerge2 = new AddressSettings();
@ -100,6 +104,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(addressSettings.getRedeliveryDelay(), 2003);
Assert.assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001);
Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
}
@Test

View File

@ -275,25 +275,25 @@ control.
#### Blocking producer window based flow control using AMQP
Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
flow control. Artemis CORE protocol and AMQP. Both protocols implement flow
control slightly differently and therefore address full BLOCK policy behaves
slightly different for clients uses each protocol respectively.
Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control. Artemis CORE protocol and
AMQP. Both protocols implement flow control slightly differently and therefore address full BLOCK policy behaves slightly
different for clients that use each protocol respectively.
As explained earlier in this chapter the CORE protocol uses a producer window size
flow control system. Where credits (representing bytes) are allocated to producers,
if a producer wants to send a message it should wait until it has enough bytes available
to send it. AMQP flow control credits are not representative of bytes but instead represent
the number of messages a producer is permitted to send (regardless of size).
As explained earlier in this chapter the CORE protocol uses a producer window size flow control system. Where credits
(representing bytes) are allocated to producers, if a producer wants to send a message it should wait until it has
enough byte credits available for it to send. AMQP flow control credits are not representative of bytes but instead
represent the number of messages a producer is permitted to send (regardless of the message size).
BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis
will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30.
The broker will stop issuing credits once an address is full. However, since AMQP credits represent
whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an
address upper bound should the broker continue accepting messages until the clients credits are exhausted.
For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis
will start rejecting messages until the address becomes unblocked. This should be taken into consideration when writing
application code.
BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis will issue 100 credits
to a client at a time and refresh them when the clients credits reaches 30. The broker will stop issuing credits once an
address is full. However, since AMQP credits represent whole messages and not bytes, it would be possible in some
scenarios for an AMQP client to significantly exceed an address upper bound should the broker continue accepting
messages until the clients credits are exhausted. For this reason there is an additional parameter available on address
settings that specifies an upper bound on an address size in bytes. Once this upper bound is reach Artemis will start
rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default set to -1 (or no limit).
This is additional parameter allows a kind of soft and hard limit, in normal circumstances the broker will utilize the
max-size-bytes parameter using using flow control to put back pressure on the client, but will protect the broker by
rejecting messages once the address size is reached.
### Rate limited flow control

View File

@ -412,7 +412,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return txContext.getTransactionId();
}
AmqpTransactionContext getTransactionContext() {
public AmqpTransactionContext getTransactionContext() {
return txContext;
}

View File

@ -213,7 +213,7 @@ public class AmqpTransactionContext {
//----- Internal access to context properties ----------------------------//
AmqpTransactionCoordinator getCoordinator() {
public AmqpTransactionCoordinator getCoordinator() {
return coordinator;
}

View File

@ -95,8 +95,13 @@ public class ProtonTest extends ActiveMQTestBase {
private static final String password = "guest";
private static final String brokerName = "my-broker";
private static final long maxSizeBytes = 1 * 1024 * 1024;
private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
@ -310,6 +315,7 @@ public class ProtonTest extends ActiveMQTestBase {
Assert.assertEquals(q.getMessageCount(), 0);
}
@Test
public void testRollbackConsumer() throws Throwable {
@ -342,8 +348,11 @@ public class ProtonTest extends ActiveMQTestBase {
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
String destinationAddress = address + 1;
fillAddress(destinationAddress);
fillAddress(address + 1);
long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
assertTrue(addressSize >= maxSizeBytesRejectThreshold);
}
@Test
@ -367,6 +376,9 @@ public class ProtonTest extends ActiveMQTestBase {
}
assertTrue(e instanceof ResourceAllocationException);
assertTrue(e.getMessage().contains("resource-limit-exceeded"));
long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
assertTrue(addressSize >= maxSizeBytesRejectThreshold);
}
@Test
@ -393,6 +405,9 @@ public class ProtonTest extends ActiveMQTestBase {
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
}
finally {
amqpConnection.close();
@ -446,7 +461,7 @@ public class ProtonTest extends ActiveMQTestBase {
fillAddress(address + 1);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1);
@ -459,6 +474,43 @@ public class ProtonTest extends ActiveMQTestBase {
}
}
@Test
public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
// Create the link attach before filling the address to ensure the link is allocated credit.
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
sender.setPresettle(true);
fillAddress(address);
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
message.setBytes(payload);
Exception expectedException = null;
try {
session.begin();
sender.send(message);
session.commit();
}
catch (Exception e) {
expectedException = e;
}
finally {
amqpConnection.close();
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
assertTrue(expectedException.getMessage().contains("Address is full: " + address));
}
/**
* Fills an address. Careful when using this method. Only use when rejected messages are switched on.
* @param address
@ -520,6 +572,7 @@ public class ProtonTest extends ActiveMQTestBase {
timeout.await(5, TimeUnit.SECONDS);
System.out.println("Messages Sent: " + sentMessages);
if (errors[0] != null) {
throw errors[0];
}
@ -1313,7 +1366,8 @@ public class ProtonTest extends ActiveMQTestBase {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
addressSettings.setMaxSizeBytes(maxSizeBytes);
addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}

View File

@ -306,6 +306,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
return false;
}
@Override
public boolean isRejectingMessages() {
return false;
}
@Override
public void applySetting(AddressSettings addressSettings) {