mirror of https://github.com/apache/activemq.git
[AMQ-6849] provide a virtualTopic dropOnResourceLimit attribute, default to false.
Enabling will allow dispatch to continue even when end to some consumer queues fail due to resource constraints. The existing systemUsage sendFailIfNoSpace and sendFailIfNoSpaceAfterTimeout are promoted to destination policy options allowing selective destinations to fail a send with an exception. The global system usage options are still in place. When dropOnResourceLimit=true dispatch will continue in the event that any subscription queue throws a resource limit exception on send failure.
This commit is contained in:
parent
be6f4b69c1
commit
6da08b245e
|
@ -608,6 +608,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
}
|
||||
}
|
||||
|
||||
private volatile ResourceAllocationException sendMemAllocationException = null;
|
||||
@Override
|
||||
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
|
||||
final ConnectionContext context = producerExchange.getConnectionContext();
|
||||
|
@ -641,10 +642,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
|
||||
}
|
||||
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
|
||||
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
|
||||
+ message.getProducerId() + ") to prevent flooding "
|
||||
+ getActiveMQDestination().getQualifiedName() + "."
|
||||
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
|
||||
ResourceAllocationException resourceAllocationException = sendMemAllocationException;
|
||||
if (resourceAllocationException == null) {
|
||||
synchronized (this) {
|
||||
resourceAllocationException = sendMemAllocationException;
|
||||
if (resourceAllocationException == null) {
|
||||
sendMemAllocationException = resourceAllocationException = new ResourceAllocationException("Usage Manager Memory Limit reached on "
|
||||
+ getActiveMQDestination().getQualifiedName() + "."
|
||||
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
|
||||
}
|
||||
}
|
||||
}
|
||||
throw resourceAllocationException;
|
||||
}
|
||||
|
||||
// We can avoid blocking due to low usage if the producer is
|
||||
|
|
|
@ -109,6 +109,8 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
*/
|
||||
private int optimizeMessageStoreInFlightLimit = 10;
|
||||
private boolean persistJMSRedelivered = false;
|
||||
private int sendFailIfNoSpace = -1;
|
||||
private long sendFailIfNoSpaceAfterTimeout = -1;
|
||||
|
||||
|
||||
public void configure(Broker broker,Queue queue) {
|
||||
|
@ -147,7 +149,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
*
|
||||
* If includedProperties is null then all of the properties will be set as
|
||||
* isUpdate will return true
|
||||
* @param baseDestination
|
||||
* @param queue
|
||||
* @param includedProperties
|
||||
*/
|
||||
public void update(Queue queue, Set<String> includedProperties) {
|
||||
|
@ -309,6 +311,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
}
|
||||
destination.setSlowConsumerStrategy(scs);
|
||||
destination.setPrioritizedMessages(isPrioritizedMessages());
|
||||
if (sendFailIfNoSpace != -1) {
|
||||
destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace());
|
||||
}
|
||||
if (sendFailIfNoSpaceAfterTimeout != 0) {
|
||||
destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout());
|
||||
}
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||
|
@ -1100,4 +1108,24 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public String toString() {
|
||||
return "PolicyEntry [" + destination + "]";
|
||||
}
|
||||
|
||||
public void setSendFailIfNoSpace(boolean val) {
|
||||
if (val) {
|
||||
this.sendFailIfNoSpace = 1;
|
||||
} else {
|
||||
this.sendFailIfNoSpace = 0;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSendFailIfNoSpace() {
|
||||
return sendFailIfNoSpace == 1;
|
||||
}
|
||||
|
||||
public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
|
||||
this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
|
||||
}
|
||||
|
||||
public long getSendFailIfNoSpaceAfterTimeout() {
|
||||
return this.sendFailIfNoSpaceAfterTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ public class VirtualTopic implements VirtualDestination {
|
|||
private boolean local = false;
|
||||
private boolean concurrentSend = false;
|
||||
private boolean transactedSend = false;
|
||||
private boolean dropOnResourceLimit = false;
|
||||
|
||||
@Override
|
||||
public ActiveMQDestination getVirtualDestination() {
|
||||
|
@ -243,4 +244,12 @@ public class VirtualTopic implements VirtualDestination {
|
|||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDropOnResourceLimit() {
|
||||
return dropOnResourceLimit;
|
||||
}
|
||||
|
||||
public void setDropOnResourceLimit(boolean dropOnResourceLimit) {
|
||||
this.dropOnResourceLimit = dropOnResourceLimit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.activemq.command.LocalTransactionId;
|
|||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.util.LRUCache;
|
||||
|
||||
import javax.jms.ResourceAllocationException;
|
||||
|
||||
/**
|
||||
* A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
|
||||
*/
|
||||
|
@ -44,6 +46,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
private final boolean local;
|
||||
private final boolean concurrentSend;
|
||||
private final boolean transactedSend;
|
||||
private final boolean dropMessageOnResourceLimit;
|
||||
|
||||
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
|
||||
|
||||
|
@ -54,6 +57,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
this.local = virtualTopic.isLocal();
|
||||
this.concurrentSend = virtualTopic.isConcurrentSend();
|
||||
this.transactedSend = virtualTopic.isTransactedSend();
|
||||
this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit();
|
||||
}
|
||||
|
||||
public Topic getTopic() {
|
||||
|
@ -93,6 +97,10 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
if (exceptionAtomicReference.get() == null) {
|
||||
dest.send(context, copy(message, dest.getActiveMQDestination()));
|
||||
}
|
||||
} catch (ResourceAllocationException e) {
|
||||
if (!dropMessageOnResourceLimit) {
|
||||
exceptionAtomicReference.set(e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
exceptionAtomicReference.set(e);
|
||||
} finally {
|
||||
|
@ -112,7 +120,13 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
} else {
|
||||
for (final Destination dest : destinations) {
|
||||
if (shouldDispatch(broker, message, dest)) {
|
||||
dest.send(context, copy(message, dest.getActiveMQDestination()));
|
||||
try {
|
||||
dest.send(context, copy(message, dest.getActiveMQDestination()));
|
||||
} catch (ResourceAllocationException e) {
|
||||
if (!dropMessageOnResourceLimit) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -55,9 +56,9 @@ public class MemoryTransactionStore implements TransactionStore {
|
|||
|
||||
public class Tx {
|
||||
|
||||
public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
|
||||
public List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
|
||||
|
||||
public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
|
||||
public final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
|
||||
|
||||
public void add(AddMessageCommand msg) {
|
||||
messages.add(msg);
|
||||
|
@ -239,8 +240,13 @@ public class MemoryTransactionStore implements TransactionStore {
|
|||
public Tx getTx(Object txid) {
|
||||
Tx tx = inflightTransactions.get(txid);
|
||||
if (tx == null) {
|
||||
tx = new Tx();
|
||||
inflightTransactions.put(txid, tx);
|
||||
synchronized (inflightTransactions) {
|
||||
tx = inflightTransactions.get(txid);
|
||||
if ( tx == null) {
|
||||
tx = new Tx();
|
||||
inflightTransactions.put(txid, tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
return tx;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -72,9 +73,9 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
}
|
||||
|
||||
public class Tx {
|
||||
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
|
||||
private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
|
||||
|
||||
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
|
||||
private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
|
||||
|
||||
public void add(AddMessageCommand msg) {
|
||||
messages.add(msg);
|
||||
|
@ -248,8 +249,13 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
public Tx getTx(Object txid) {
|
||||
Tx tx = inflightTransactions.get(txid);
|
||||
if (tx == null) {
|
||||
tx = new Tx();
|
||||
inflightTransactions.put(txid, tx);
|
||||
synchronized (inflightTransactions) {
|
||||
tx = inflightTransactions.get(txid);
|
||||
if (tx == null) {
|
||||
tx = new Tx();
|
||||
inflightTransactions.put(txid, tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
return tx;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualTopic;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class VirtualTopicFlowControlDiscardTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFlowControlDiscardTest.class);
|
||||
|
||||
final String payload = new String(new byte[155]);
|
||||
int numConsumers = 2;
|
||||
int total = 500;
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public boolean concurrentSend;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public boolean transactedSend;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public boolean sendFailGlobal;
|
||||
|
||||
@Parameterized.Parameter(3)
|
||||
public boolean persistentBroker;
|
||||
|
||||
|
||||
BrokerService brokerService;
|
||||
ConnectionFactory connectionFactory;
|
||||
|
||||
@Before
|
||||
public void createBroker() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setPersistent(persistentBroker);
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry restrictedUsage = new PolicyEntry();
|
||||
restrictedUsage.setCursorMemoryHighWaterMark(50);
|
||||
restrictedUsage.setMemoryLimit(5000);
|
||||
restrictedUsage.setCursorMemoryHighWaterMark(110);
|
||||
if (sendFailGlobal) {
|
||||
brokerService.getSystemUsage().setSendFailIfNoSpace(true);
|
||||
} else {
|
||||
restrictedUsage.setSendFailIfNoSpace(true);
|
||||
restrictedUsage.setSendFailIfNoSpaceAfterTimeout(0);
|
||||
}
|
||||
|
||||
policyMap.put(new ActiveMQQueue("Consumer.0.VirtualTopic.TEST"), restrictedUsage);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
brokerService.start();
|
||||
|
||||
for (DestinationInterceptor destinationInterceptor : brokerService.getDestinationInterceptors()) {
|
||||
for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
|
||||
if (virtualDestination instanceof VirtualTopic) {
|
||||
((VirtualTopic) virtualDestination).setConcurrentSend(concurrentSend);
|
||||
((VirtualTopic) virtualDestination).setTransactedSend(transactedSend);
|
||||
((VirtualTopic) virtualDestination).setDropOnResourceLimit(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
|
||||
zeroPrefetch.setAll(0);
|
||||
activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
|
||||
connectionFactory = activeMQConnectionFactory;
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name ="cS=#{0},tS=#{1},g=#{2},persist=#{3}")
|
||||
public static Iterable<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.TRUE},
|
||||
{Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE},
|
||||
{Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE},
|
||||
{Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE},
|
||||
{Boolean.FALSE, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE},
|
||||
{Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE},
|
||||
{Boolean.TRUE, Boolean.TRUE, Boolean.FALSE, Boolean.FALSE},
|
||||
{Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.TRUE},
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFanoutWithResourceException() throws Exception {
|
||||
|
||||
Connection connection1 = connectionFactory.createConnection();
|
||||
connection1.start();
|
||||
|
||||
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
for (int i=0; i<numConsumers; i++) {
|
||||
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
|
||||
}
|
||||
|
||||
Connection connection2 = connectionFactory.createConnection();
|
||||
connection2.start();
|
||||
Session producerSession = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
LOG.info("Starting producer: " + start);
|
||||
for (int i = 0; i < total; i++) {
|
||||
producer.send(producerSession.createTextMessage(payload));
|
||||
}
|
||||
LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) );
|
||||
|
||||
Destination destination = brokerService.getDestination(new ActiveMQQueue("Consumer.0.VirtualTopic.TEST"));
|
||||
LOG.info("Dest 0 size: " + (destination.getDestinationStatistics().getEnqueues().getCount()));
|
||||
assertTrue("did not get all", (destination.getDestinationStatistics().getEnqueues().getCount() < total));
|
||||
|
||||
assertTrue("got all", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
Destination dest = brokerService.getDestination(new ActiveMQQueue("Consumer.1.VirtualTopic.TEST"));
|
||||
LOG.info("Dest 1 size: " + dest.getDestinationStatistics().getEnqueues().getCount());
|
||||
return total == dest.getDestinationStatistics().getEnqueues().getCount();
|
||||
}
|
||||
}));
|
||||
|
||||
try {
|
||||
connection1.close();
|
||||
} catch (Exception ex) {}
|
||||
try {
|
||||
connection2.close();
|
||||
} catch (Exception ex) {}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue