The Producer MemoryLimit can lead to network deadlock when spooling is disabled.

So we now disable using it when sooling is used on a queue.

see:
https://issues.apache.org/activemq/browse/AMQ-1606



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@632455 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-02-29 21:59:33 +00:00
parent c0b00b2d28
commit 13829eff1d
9 changed files with 70 additions and 40 deletions

View File

@ -160,6 +160,7 @@ public class BrokerService implements Service {
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver;
private boolean clustered;
private Broker regionBroker;
static {
@ -1430,7 +1431,7 @@ public class BrokerService implements Service {
* @throws
*/
protected Broker createBroker() throws Exception {
Broker regionBroker = createRegionBroker();
regionBroker = createRegionBroker();
Broker broker = addInterceptors(regionBroker);
// Add a filter that will stop access to the broker once stopped
@ -1488,7 +1489,7 @@ public class BrokerService implements Service {
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
RegionBroker regionBroker = null;
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
}
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@ -1797,4 +1798,12 @@ public class BrokerService implements Service {
}
}
}
public Broker getRegionBroker() {
return regionBroker;
}
public void setRegionBroker(Broker regionBroker) {
this.regionBroker = regionBroker;
}
}

View File

@ -126,7 +126,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.jmx;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@ -34,10 +35,10 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;
super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = broker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {

View File

@ -16,7 +16,10 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo;
@ -33,8 +36,8 @@ public abstract class BaseDestination implements Destination {
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected final SystemUsage systemUsage;
protected final MemoryUsage memoryUsage;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private int maxProducersToAudit=1024;
private int maxAuditDepth=2048;
@ -43,36 +46,41 @@ public abstract class BaseDestination implements Destination {
private boolean useCache=true;
private int minimumMessageSize=1024;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
/**
* @param broker
* @param store
* @param destination
* @param systemUsage
* @param parentStats
* @throws Exception
*/
public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) {
this.broker=broker;
public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
this.brokerService = brokerService;
this.broker=brokerService.getBroker();
this.store=store;
this.destination=destination;
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can
// flush messages to disk when usage gets high.
if (store != null) {
store.setMemoryUsage(this.memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
this.systemUsage = brokerService.getProducerSystemUsage();
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
}
/**
* initialize the destination
* @throws Exception
*/
public abstract void initialize() throws Exception;
public void initialize() throws Exception {
// Let the store know what usage manager we are using so that he can
// flush messages to disk when usage gets high.
if (store != null) {
store.setMemoryUsage(this.memoryUsage);
}
}
/**
* @return the producerFlowControl
*/

View File

@ -22,6 +22,7 @@ import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@ -44,13 +45,13 @@ import org.apache.activemq.usage.SystemUsage;
*/
public class DestinationFactoryImpl extends DestinationFactory {
protected final SystemUsage memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
private final BrokerService brokerService;
public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.memoryManager = memoryManager;
public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.brokerService = brokerService;
this.taskRunnerFactory = taskRunnerFactory;
if (persistenceAdapter == null) {
throw new IllegalArgumentException("null persistenceAdapter");
@ -76,7 +77,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume
@ -90,14 +91,14 @@ public class DestinationFactoryImpl extends DestinationFactory {
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
return queue;
}
} else if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Topic(broker.getRoot(), destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
@ -113,7 +114,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
}
Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
topic.initialize();
return topic;

View File

@ -32,7 +32,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -65,7 +65,6 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -106,9 +105,9 @@ public class Queue extends BaseDestination implements Task {
}
};
public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
super(brokerService, store, destination, parentStats);
if (destination.isTemporary() || broker == null || store==null ) {
this.messages = new VMPendingMessageCursor();
@ -130,8 +129,17 @@ public class Queue extends BaseDestination implements Task {
this.dispatchSelector=new QueueDispatchSelector(destination);
}
public void initialize() throws Exception {
// If a VMPendingMessageCursor don't use the default Producer System Usage
// since it turns into a shared blocking queue which can lead to a network deadlock.
// If we are ccursoring to disk..it's not and issue because it does not block due
// to large disk sizes.
if( messages instanceof VMPendingMessageCursor ) {
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
super.initialize();
if (store != null) {
// Restore the persistent messages.
messages.setSystemUsage(systemUsage);

View File

@ -82,7 +82,7 @@ public class RegionBroker implements Broker {
private final Region topicRegion;
private final Region tempQueueRegion;
private final Region tempTopicRegion;
private BrokerService brokerService;
protected BrokerService brokerService;
private boolean started;
private boolean keepDurableSubsActive;
@ -161,7 +161,7 @@ public class RegionBroker implements Broker {
}
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {

View File

@ -17,9 +17,8 @@
package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -34,18 +33,20 @@ import org.apache.commons.logging.LogFactory;
*/
public class TempQueueRegion extends AbstractTempRegion {
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
private final BrokerService brokerService;
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
// Policy
// setAutoCreateDestinations(false);
this.brokerService = brokerService;
}
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@ -87,9 +88,9 @@ public class Topic extends BaseDestination implements Task{
};
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
super(brokerService, store, destination, parentStats);
this.topicStore=store;
//set default subscription recovery policy
if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
@ -102,6 +103,7 @@ public class Topic extends BaseDestination implements Task{
}
public void initialize() throws Exception{
super.initialize();
if (store != null) {
int messageCount = store.getMessageCount();
destinationStatistics.getMessages().setCount(messageCount);