diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index b45f583ce1..2b1175dd8d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -87,6 +87,7 @@ import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.state.CommandVisitorAdapter; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -114,7 +115,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected boolean alwaysSessionAsync = true; private TaskRunnerFactory sessionTaskRunner; - private final ThreadPoolExecutor asyncConnectionThread; + private final ThreadPoolExecutor executor; // Connection state variables private final ConnectionInfo info; @@ -188,6 +189,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean useDedicatedTaskRunner; protected volatile CountDownLatch transportInterruptionProcessingComplete; private long consumerFailoverRedeliveryWaitPeriod; + private final Scheduler scheduler; /** * Construct an ActiveMQConnection @@ -204,16 +206,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Configure a single threaded executor who's core thread can timeout if // idle - asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport); + Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); thread.setDaemon(true); return thread; } }); // asyncConnectionThread.allowCoreThreadTimeOut(true); - - this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId())); + String uniqueId = CONNECTION_ID_GENERATOR.generateId(); + this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(), -1); @@ -224,6 +226,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); + this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler"); + this.scheduler.start(); } protected void setUserName(String userName) { @@ -609,6 +613,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon advisoryConsumer.dispose(); advisoryConsumer = null; } + if (this.scheduler != null) { + try { + this.scheduler.stop(); + } catch (Exception e) { + JMSException ex = JMSExceptionSupport.create(e); + throw ex; + } + } long lastDeliveredSequenceId = 0; for (Iterator i = this.sessions.iterator(); i.hasNext();) { @@ -656,8 +668,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } finally { try { - if (asyncConnectionThread != null){ - asyncConnectionThread.shutdown(); + if (executor != null){ + executor.shutdown(); } }catch(Throwable e) { LOG.error("Error shutting down thread pool " + e,e); @@ -1719,7 +1731,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon @Override public Response processConnectionError(final ConnectionError error) throws Exception { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { onAsyncException(error.getException()); } @@ -1779,7 +1791,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void onClientInternalException(final Throwable error) { if ( !closed.get() && !closing.get() ) { if ( this.clientInternalExceptionListener != null ) { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { ActiveMQConnection.this.clientInternalExceptionListener.onException(error); } @@ -1804,7 +1816,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } final JMSException e = (JMSException)error; - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { ActiveMQConnection.this.exceptionListener.onException(e); } @@ -1819,7 +1831,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void onException(final IOException error) { onAsyncException(error); if (!closing.get() && !closed.get()) { - asyncConnectionThread.execute(new Runnable() { + executor.execute(new Runnable() { public void run() { transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); @@ -2297,4 +2309,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public long getConsumerFailoverRedeliveryWaitPeriod() { return consumerFailoverRedeliveryWaitPeriod; } + + protected Scheduler getScheduler() { + return this.scheduler; + } + + protected ThreadPoolExecutor getExecutor() { + return this.executor; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a9495e8e80..03c3b55ba9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -29,7 +29,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -37,7 +36,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; @@ -111,7 +109,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected final Scheduler scheduler; protected final ActiveMQSession session; protected final ConsumerInfo info; @@ -130,17 +128,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private int ackCounter; private int dispatchedCount; private final AtomicReference messageListener = new AtomicReference(); - private JMSConsumerStatsImpl stats; + private final JMSConsumerStatsImpl stats; private final String selector; private boolean synchronizationRegistered; - private AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); private MessageAvailableListener availableListener; private RedeliveryPolicy redeliveryPolicy; private boolean optimizeAcknowledge; - private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); + private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService; private MessageTransformer transformer; private boolean clearDispatchList; @@ -152,7 +150,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private IOException failureError; private long optimizeAckTimestamp = System.currentTimeMillis(); - private long optimizeAckTimeout = 300; + private final long optimizeAckTimeout = 300; private long failoverRedeliveryWaitPeriod = 0; /** @@ -202,6 +200,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } this.session = session; + this.scheduler = session.getScheduler(); this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); setTransformer(session.getTransformer()); @@ -634,10 +633,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!unconsumedMessages.isClosed()) { if (session.getTransactionContext().isInTransaction()) { session.getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { doClose(); } + @Override public void afterRollback() throws Exception { doClose(); } @@ -912,16 +913,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!synchronizationRegistered) { synchronizationRegistered = true; session.getTransactionContext().addSynchronization(new Synchronization() { + @Override public void beforeEnd() throws Exception { acknowledge(); synchronizationRegistered = false; } + @Override public void afterCommit() throws Exception { commit(); synchronizationRegistered = false; } + @Override public void afterRollback() throws Exception { rollback(); synchronizationRegistered = false; @@ -1325,6 +1329,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC unconsumedMessages.stop(); } + @Override public String toString() { return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() + " }"; diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 43e2e586f7..dd6e8201bd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -19,13 +19,11 @@ package org.apache.activemq; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -73,9 +71,9 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl protected ProducerInfo info; protected boolean closed; - private JMSProducerStatsImpl stats; + private final JMSProducerStatsImpl stats; private AtomicLong messageSequence; - private long startTime; + private final long startTime; private MessageTransformer transformer; private MemoryUsage producerWindow; @@ -93,6 +91,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl // size > 0 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { producerWindow = new MemoryUsage("Producer Window: " + producerId); + producerWindow.setExecutor(session.getConnectionExecutor()); producerWindow.setLimit(this.info.getWindowSize()); producerWindow.start(); } @@ -164,6 +163,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl * * @throws IllegalStateException */ + @Override protected void checkClosed() throws IllegalStateException { if (closed) { throw new IllegalStateException("The producer is closed"); @@ -280,6 +280,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl this.info = info; } + @Override public String toString() { return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 185e7ba83c..8883808268 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; - import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -53,7 +53,6 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -198,7 +197,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } private static final Log LOG = LogFactory.getLog(ActiveMQSession.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + private final Scheduler scheduler; + private final ThreadPoolExecutor connectionExecutor; protected int acknowledgementMode; protected final ActiveMQConnection connection; @@ -220,7 +220,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta protected Object sendMutex = new Object(); private MessageListener messageListener; - private JMSSessionStatsImpl stats; + private final JMSSessionStatsImpl stats; private TransactionContext transactionContext; private DeliveryListener deliveryListener; private MessageTransformer transformer; @@ -251,7 +251,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta this.connection.asyncSendPacket(info); setTransformer(connection.getTransformer()); setBlobTransferPolicy(connection.getBlobTransferPolicy()); - + this.scheduler=connection.getScheduler(); + this.connectionExecutor=connection.getExecutor(); if (connection.isStarted()) { start(); } @@ -613,11 +614,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta synchronizationRegistered = true; getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { doClose(); synchronizationRegistered = false; } + @Override public void afterRollback() throws Exception { doClose(); synchronizationRegistered = false; @@ -846,6 +849,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if (ack.getTransactionId() != null) { getTransactionContext().addSynchronization(new Synchronization() { + @Override public void afterRollback() throws Exception { md.getMessage().onMessageRolledBack(); // ensure we don't filter this as a duplicate @@ -1947,6 +1951,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta return executor.getUnconsumedMessages(); } + @Override public String toString() { return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; } @@ -2025,4 +2030,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta syncSendPacket(ack); } } + + protected Scheduler getScheduler() { + return this.scheduler; + } + + protected ThreadPoolExecutor getConnectionExecutor() { + return this.connectionExecutor; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 068ce41491..b118c07b31 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -33,6 +34,7 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -372,6 +374,10 @@ public interface Broker extends Region, Service { * configuration */ void nowMasterBroker(); + + Scheduler getScheduler(); + + ThreadPoolExecutor getExecutor(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 50f2f765dd..8149584704 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -40,6 +41,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -300,4 +302,12 @@ public class BrokerFilter implements Broker { ConsumerControl control) { next.processConsumerControl(consumerExchange, control); } + + public Scheduler getScheduler() { + return next.getScheduler(); + } + + public ThreadPoolExecutor getExecutor() { + return next.getExecutor(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 38f39b4bea..6451600fcc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PostConstruct; @@ -78,9 +81,10 @@ import org.apache.activemq.security.SecurityContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; @@ -188,9 +192,10 @@ public class BrokerService implements Service { private IOExceptionHandler ioExceptionHandler; private boolean schedulerSupport = true; private File schedulerDirectoryFile; - + private Scheduler scheduler; + private ThreadPoolExecutor executor; private boolean slave = true; - + static { String localHostName = "localhost"; try { @@ -589,6 +594,15 @@ public class BrokerService implements Service { } } } + if (this.taskRunnerFactory != null) { + this.taskRunnerFactory.shutdown(); + } + if (this.scheduler != null) { + this.scheduler.stop(); + } + if (this.executor != null) { + this.executor.shutdownNow(); + } LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); synchronized (shutdownHooks) { for (Runnable hook : shutdownHooks) { @@ -756,9 +770,6 @@ public class BrokerService implements Service { } public PersistenceAdapterFactory getPersistenceFactory() { - if (persistenceFactory == null) { - persistenceFactory = createPersistenceFactory(); - } return persistenceFactory; } @@ -848,6 +859,7 @@ public class BrokerService implements Service { try { if (systemUsage == null) { systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); + systemUsage.setExecutor(getExecutor()); systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default // 64 // Meg @@ -869,6 +881,9 @@ public class BrokerService implements Service { removeService(this.systemUsage); } this.systemUsage = memoryManager; + if (this.systemUsage.getExecutor()==null) { + this.systemUsage.setExecutor(getExecutor()); + } addService(this.systemUsage); } @@ -953,11 +968,11 @@ public class BrokerService implements Service { } public TaskRunnerFactory getTaskRunnerFactory() { - if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, + if (this.taskRunnerFactory == null) { + this.taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, isDedicatedTaskRunner()); } - return taskRunnerFactory; + return this.taskRunnerFactory; } public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { @@ -1769,10 +1784,10 @@ public class BrokerService implements Service { RegionBroker regionBroker; if (isUseJmx()) { regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), - getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); + getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); } else { regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, - destinationInterceptor); + destinationInterceptor,getScheduler(),getExecutor()); } destinationFactory.setRegionBroker(regionBroker); regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); @@ -1850,20 +1865,20 @@ public class BrokerService implements Service { protected PersistenceAdapter createPersistenceAdapter() throws IOException { if (isPersistent()) { - return getPersistenceFactory().createPersistenceAdapter(); + PersistenceAdapterFactory fac = getPersistenceFactory(); + if (fac != null) { + return fac.createPersistenceAdapter(); + }else { + KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); + File dir = new File(getBrokerDataDirectory(),"KahaDB"); + adaptor.setDirectory(dir); + return adaptor; + } } else { return new MemoryPersistenceAdapter(); } } - protected AMQPersistenceAdapterFactory createPersistenceFactory() { - AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory(); - factory.setDataDirectory(getBrokerDataDirectory()); - factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory()); - factory.setBrokerName(getBrokerName()); - return factory; - } - protected ObjectName createBrokerObjectName() throws IOException { try { return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" @@ -2124,6 +2139,31 @@ public class BrokerService implements Service { } } } + + protected synchronized ThreadPoolExecutor getExecutor() { + if (this.executor == null) { + this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Usage Async Task"); + thread.setDaemon(true); + return thread; + } + }); + } + return this.executor; + } + + protected synchronized Scheduler getScheduler() { + if (this.scheduler==null) { + this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); + try { + this.scheduler.start(); + } catch (Exception e) { + LOG.error("Failed to start Scheduler ",e); + } + } + return this.scheduler; + } public Broker getRegionBroker() { return regionBroker; @@ -2251,7 +2291,5 @@ public class BrokerService implements Service { public void setSchedulerDirectory(String schedulerDirectory) { setSchedulerDirectoryFile(new File(schedulerDirectory)); - } - - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 9ce2c72381..12834a85a9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -283,4 +285,12 @@ public class EmptyBroker implements Broker { public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { } + + public Scheduler getScheduler() { + return null; + } + + public ThreadPoolExecutor getExecutor() { + return null; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index db88ce90bb..dcface8bbf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -302,4 +304,12 @@ public class ErrorBroker implements Broker { ConsumerControl control) { throw new BrokerStoppedException(this.message); } + + public Scheduler getScheduler() { + throw new BrokerStoppedException(this.message); + } + + public ThreadPoolExecutor getExecutor() { + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index d6f1670109..8c120326de 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.net.URI; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -41,6 +42,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; /** @@ -312,4 +314,12 @@ public class MutableBrokerFilter implements Broker { getNext().processConsumerControl(consumerExchange, control); } + public Scheduler getScheduler() { + return getNext().getScheduler(); + } + + public ThreadPoolExecutor getExecutor() { + return getNext().getExecutor(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 37fded5bb8..18fbdc8a65 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -16,6 +16,28 @@ */ package org.apache.activemq.broker.jmx; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; +import javax.management.InstanceNotFoundException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -41,6 +63,7 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.JMXSupport; @@ -48,27 +71,6 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import javax.management.InstanceNotFoundException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; public class ManagedRegionBroker extends RegionBroker { private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class); @@ -91,18 +93,20 @@ public class ManagedRegionBroker extends RegionBroker { private Broker contextBroker; public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, - DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { - super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor); + DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { + super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); this.managementContext = context; this.brokerObjectName = brokerObjectName; } + @Override public void start() throws Exception { super.start(); // build all existing durable subscriptions buildExistingSubscriptions(); } + @Override protected void doStop(ServiceStopper stopper) { super.doStop(stopper); // lets remove any mbeans not yet removed @@ -119,18 +123,22 @@ public class ManagedRegionBroker extends RegionBroker { registeredMBeans.clear(); } + @Override protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } + @Override protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 1f11f0537c..063998c910 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -17,9 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; - import javax.jms.ResourceAllocationException; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index 523c71c421..bc16f6ab9e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -55,6 +55,7 @@ public class DestinationFactoryImpl extends DestinationFactory { this.persistenceAdapter = persistenceAdapter; } + @Override public void setRegionBroker(RegionBroker broker) { if (broker == null) { throw new IllegalArgumentException("null broker"); @@ -62,6 +63,7 @@ public class DestinationFactoryImpl extends DestinationFactory { this.broker = broker; } + @Override public Set getDestinations() { return persistenceAdapter.getDestinations(); } @@ -69,6 +71,7 @@ public class DestinationFactoryImpl extends DestinationFactory { /** * @return instance of {@link Queue} or {@link Topic} */ + @Override public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { if (destination.isQueue()) { if (destination.isTemporary()) { @@ -100,6 +103,7 @@ public class DestinationFactoryImpl extends DestinationFactory { } } + @Override public void removeDestination(Destination dest) { ActiveMQDestination destination = dest.getActiveMQDestination(); if (!destination.isTemporary()) { @@ -131,11 +135,12 @@ public class DestinationFactoryImpl extends DestinationFactory { if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(topic); + entry.configure(broker,topic); } } } + @Override public long getLastMessageBrokerSequenceId() throws IOException { return persistenceAdapter.getLastMessageBrokerSequenceId(); } @@ -144,6 +149,7 @@ public class DestinationFactoryImpl extends DestinationFactory { return persistenceAdapter; } + @Override public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 056a74dfb7..3843ed73e0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -23,10 +23,8 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; - import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -55,7 +53,7 @@ import org.apache.commons.logging.LogFactory; public abstract class PrefetchSubscription extends AbstractSubscription { private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class); - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected final Scheduler scheduler; protected PendingMessageCursor pending; protected final List dispatched = new CopyOnWriteArrayList(); @@ -70,12 +68,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription { private final Object pendingLock = new Object(); private final Object dispatchLock = new Object(); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); - private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); + private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { super(broker,context, info); this.usageManager=usageManager; pending = cursor; + this.scheduler = broker.getScheduler(); } public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { @@ -230,6 +229,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { context.getTransaction().addSynchronization( new Synchronization() { + @Override public void afterCommit() throws Exception { synchronized(dispatchLock) { @@ -239,6 +239,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } + @Override public void afterRollback() throws Exception { synchronized(dispatchLock) { if (isSlave()) { @@ -486,6 +487,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); } + @Override public int countBeforeFull() { return info.getPrefetchSize() + prefetchExtension - dispatched.size(); } @@ -510,6 +512,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { return enqueueCounter; } + @Override public boolean isRecoveryRequired() { return pending.isRecoveryRequired(); } @@ -526,13 +529,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } - public void add(ConnectionContext context, Destination destination) throws Exception { + @Override +public void add(ConnectionContext context, Destination destination) throws Exception { synchronized(pendingLock) { super.add(context, destination); pending.add(context, destination); } } + @Override public List remove(ConnectionContext context, Destination destination) throws Exception { List rc = new ArrayList(); synchronized(pendingLock) { @@ -546,7 +551,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { synchronized(dispatchLock) { for (MessageReference r : dispatched) { if( r.getRegionDestination() == destination) { - rc.add((QueueMessageReference)r); + rc.add(r); } } destination.getDestinationStatistics().getDispatched().subtract(dispatched.size()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index a82aab296b..5f6d9b397f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -125,7 +125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { }; private final Object iteratingMutex = new Object() {}; - private static final Scheduler scheduler = Scheduler.getInstance(); + private final Scheduler scheduler; class TimeoutMessage implements Delayed { @@ -203,6 +203,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { super(brokerService, store, destination, parentStats); this.taskFactory = taskFactory; this.dispatchSelector = new QueueDispatchSelector(destination); + this.scheduler = brokerService.getBroker().getScheduler(); } public List getConsumers() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 19196a8931..303366339c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -57,6 +58,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.TransactionId; import org.apache.activemq.state.ConnectionState; import org.apache.activemq.store.kahadb.plist.PListStore; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; @@ -98,10 +100,14 @@ public class RegionBroker extends EmptyBroker { private final Map clientIdSet = new HashMap(); private final DestinationInterceptor destinationInterceptor; private ConnectionContext adminConnectionContext; + private final Scheduler scheduler; + private final ThreadPoolExecutor executor; public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, - DestinationInterceptor destinationInterceptor) throws IOException { + DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { this.brokerService = brokerService; + this.executor=executor; + this.scheduler = scheduler; if (destinationFactory == null) { throw new IllegalArgumentException("null destinationFactory"); } @@ -810,6 +816,16 @@ public class RegionBroker extends EmptyBroker { } } + + @Override + public Scheduler getScheduler() { + return this.scheduler; + } + + public ThreadPoolExecutor getExecutor() { + return this.executor; + } + @Override public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { ActiveMQDestination destination = control.getDestination(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 6af2e8b795..bcbebef8ac 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -231,7 +231,7 @@ public class TopicRegion extends AbstractRegion { if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(topic); + entry.configure(broker,topic); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java index ff70b931ce..dea16dbdd1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java @@ -5,7 +5,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; @@ -24,15 +23,19 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class); - private static final Scheduler scheduler = Scheduler.getInstance(); - private AtomicBoolean taskStarted = new AtomicBoolean(false); - private Map slowConsumers = new ConcurrentHashMap(); + private Scheduler scheduler; + private final AtomicBoolean taskStarted = new AtomicBoolean(false); + private final Map slowConsumers = new ConcurrentHashMap(); private long maxSlowCount = -1; private long maxSlowDuration = 30*1000; private long checkPeriod = 30*1000; private boolean abortConnection = false; + public void setScheduler(Scheduler s) { + this.scheduler=s; + } + public void slowConsumer(ConnectionContext context, Subscription subs) { if (maxSlowCount < 0 && maxSlowDuration < 0) { // nothing to do diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java index 77690457d3..9476575e9a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java @@ -18,7 +18,7 @@ package org.apache.activemq.broker.region.policy; import java.util.ArrayList; import java.util.List; - +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -118,4 +118,7 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover return result.toArray(new Message[result.size()]); } + public void setBroker(Broker broker) { + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java index c29c3f5a2b..955318fb90 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import java.util.Iterator; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -109,6 +110,9 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover public Message[] browse(ActiveMQDestination destination) throws Exception { return buffer.browse(destination); } + + public void setBroker(Broker broker) { + } // Implementation methods diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java index d0b7177547..e663573543 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -68,5 +69,8 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery public SubscriptionRecoveryPolicy copy() { return new LastImageSubscriptionRecoveryPolicy(); } + + public void setBroker(Broker broker) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java index c469a50100..6cb08038ee 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -52,5 +53,8 @@ public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy public Message[] browse(ActiveMQDestination dest) throws Exception { return new Message[0]; } + + public void setBroker(Broker broker) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 7d84add6fb..5a396d7ba4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -90,7 +90,7 @@ public class PolicyEntry extends DestinationMapEntry { public void configure(Broker broker,Queue queue) { - baseConfiguration(queue); + baseConfiguration(broker,queue); if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } @@ -112,14 +112,16 @@ public class PolicyEntry extends DestinationMapEntry { queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); } - public void configure(Topic topic) { - baseConfiguration(topic); + public void configure(Broker broker,Topic topic) { + baseConfiguration(broker,topic); if (dispatchPolicy != null) { topic.setDispatchPolicy(dispatchPolicy); } topic.setDeadLetterStrategy(getDeadLetterStrategy()); if (subscriptionRecoveryPolicy != null) { - topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy()); + SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); + srp.setBroker(broker); + topic.setSubscriptionRecoveryPolicy(srp); } if (memoryLimit > 0) { topic.getMemoryUsage().setLimit(memoryLimit); @@ -127,7 +129,7 @@ public class PolicyEntry extends DestinationMapEntry { topic.setLazyDispatch(isLazyDispatch()); } - public void baseConfiguration(BaseDestination destination) { + public void baseConfiguration(Broker broker,BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); destination.setEnableAudit(isEnableAudit()); @@ -148,7 +150,11 @@ public class PolicyEntry extends DestinationMapEntry { destination.setMaxExpirePageSize(getMaxExpirePageSize()); destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); - destination.setSlowConsumerStrategy(getSlowConsumerStrategy()); + SlowConsumerStrategy scs = getSlowConsumerStrategy(); + if (scs != null) { + scs.setScheduler(broker.getScheduler()); + } + destination.setSlowConsumerStrategy(scs); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java index 0713f28f1a..15a17dcd38 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java @@ -17,12 +17,11 @@ package org.apache.activemq.broker.region.policy; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import org.apache.activemq.ActiveMQMessageTransformation; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -50,9 +49,9 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); private MessageQuery query; - private AtomicLong messageSequence = new AtomicLong(0); - private IdGenerator idGenerator = new IdGenerator(); - private ProducerId producerId = createProducerId(); + private final AtomicLong messageSequence = new AtomicLong(0); + private final IdGenerator idGenerator = new IdGenerator(); + private final ProducerId producerId = createProducerId(); public SubscriptionRecoveryPolicy copy() { QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); @@ -99,6 +98,9 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception { return new org.apache.activemq.command.Message[0]; } + + public void setBroker(Broker broker) { + } protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java index d36db724fa..600d2e0871 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java @@ -2,6 +2,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.thread.Scheduler; /* * a strategy for dealing with slow consumers @@ -9,5 +10,6 @@ import org.apache.activemq.broker.region.Subscription; public interface SlowConsumerStrategy { void slowConsumer(ConnectionContext context, Subscription subs); + void setScheduler(Scheduler scheduler); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java index 1b3918f451..b6ac63efd7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.Service; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -69,4 +70,6 @@ public interface SubscriptionRecoveryPolicy extends Service { * @return the copy */ SubscriptionRecoveryPolicy copy(); + + void setBroker(Broker broker); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java index 8be2552bcb..a20af8535d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; @@ -28,7 +29,6 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.thread.Scheduler; /** @@ -42,7 +42,7 @@ import org.apache.activemq.thread.Scheduler; public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { private static final int GC_INTERVAL = 1000; - protected static final Scheduler scheduler = Scheduler.getInstance(); + private Scheduler scheduler; // TODO: need to get a better synchronized linked list that has little // contention between enqueuing and dequeuing @@ -89,6 +89,10 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli } } } + + public void setBroker(Broker broker) { + this.scheduler = broker.getScheduler(); + } public void start() throws Exception { scheduler.executePeriodically(gcTask, GC_INTERVAL); @@ -97,6 +101,7 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli public void stop() throws Exception { scheduler.cancel(gcTask); } + public void gc() { lastGCRun = System.currentTimeMillis(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index 1116b3d924..d01366da0d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -35,7 +35,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; import org.apache.activemq.thread.Scheduler; @@ -75,7 +74,7 @@ public class AsyncDataManager { public static final int PREFERED_DIFF = 1024 * 512; private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); - protected static Scheduler scheduler = Scheduler.getInstance(); + protected Scheduler scheduler; protected final Map inflightWrites = new ConcurrentHashMap(); @@ -193,7 +192,13 @@ public class AsyncDataManager { cleanup(); } }; - scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); + this.scheduler = new Scheduler("AsyncDataManager Scheduler"); + try { + this.scheduler.start(); + } catch (Exception e) { + throw new IOException(e); + } + this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); } public void lock() throws IOException { @@ -328,7 +333,12 @@ public class AsyncDataManager { if (!started) { return; } - scheduler.cancel(cleanupTask); + this.scheduler.cancel(cleanupTask); + try { + this.scheduler.stop(); + } catch (Exception e) { + throw new IOException(e); + } accessorPool.close(); storeState(false); appender.close(); @@ -376,7 +386,7 @@ public class AsyncDataManager { public synchronized void addInterestInFile(int file) throws IOException { if (file >= 0) { Integer key = Integer.valueOf(file); - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); if (dataFile == null) { throw new IOException("That data file does not exist"); } @@ -393,7 +403,7 @@ public class AsyncDataManager { public synchronized void removeInterestInFile(int file) throws IOException { if (file >= 0) { Integer key = Integer.valueOf(file); - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); removeInterestInFile(dataFile); } @@ -414,7 +424,7 @@ public class AsyncDataManager { List purgeList = new ArrayList(); for (Integer key : unUsed) { - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); purgeList.add(dataFile); } for (DataFile dataFile : purgeList) { @@ -432,7 +442,7 @@ public class AsyncDataManager { for (Integer key : unUsed) { // Only add files less than the lastFile.. if( key.intValue() < lastFile.intValue() ) { - DataFile dataFile = (DataFile)fileMap.get(key); + DataFile dataFile = fileMap.get(key); purgeList.add(dataFile); } } @@ -499,6 +509,7 @@ public class AsyncDataManager { this.maxFileLength = maxFileLength; } + @Override public String toString() { return "DataManager:(" + filePrefix + ")"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 0213d75022..68254f903b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.activeio.journal.Journal; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -58,7 +57,6 @@ import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -85,7 +83,7 @@ import org.apache.commons.logging.LogFactory; public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); - private static final Scheduler scheduler = Scheduler.getInstance(); + private Scheduler scheduler; private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; @@ -99,7 +97,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private SystemUsage usageManager; private long checkpointInterval = 1000 * 20; private int maxCheckpointMessageAddSize = 1024 * 4; - private AMQTransactionStore transactionStore = new AMQTransactionStore(this); + private final AMQTransactionStore transactionStore = new AMQTransactionStore(this); private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private final AtomicBoolean started = new AtomicBoolean(false); @@ -112,7 +110,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private File directory; private File directoryArchive; private BrokerService brokerService; - private AtomicLong storeSize = new AtomicLong(); + private final AtomicLong storeSize = new AtomicLong(); private boolean persistentIndex=true; private boolean useNio = true; private boolean archiveDataLogs=false; @@ -124,8 +122,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; - private Map> dataFilesInProgress = new ConcurrentHashMap> (); - private String directoryPath = ""; + private final Map> dataFilesInProgress = new ConcurrentHashMap> (); private RandomAccessFile lockFile; private FileLock lock; private boolean disableLocking = DISABLE_LOCKING; @@ -134,6 +131,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private boolean lockAquired; private boolean recoverReferenceStore=true; private boolean forceRecoverReferenceStore=false; + private boolean useDedicatedTaskRunner=false; + private int journalThreadPriority = Thread.MAX_PRIORITY; public String getBrokerName() { return this.brokerName; @@ -165,12 +164,19 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, } else { this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); this.directory = new File(directory, "amqstore"); - this.directoryPath=directory.getAbsolutePath(); + directory.getAbsolutePath(); } } if (this.directoryArchive == null) { this.directoryArchive = new File(this.directory,"archive"); } + if (this.brokerService != null) { + this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory(); + }else { + this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler"); + } + this.taskRunnerFactory= new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(), + true, 1000, isUseDedicatedTaskRunner()); IOHelper.mkdirs(this.directory); lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); lock(); @@ -192,10 +198,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, referenceStoreAdapter.setBrokerName(getBrokerName()); referenceStoreAdapter.setUsageManager(usageManager); referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); - if (taskRunnerFactory == null) { - taskRunnerFactory = createTaskRunnerFactory(); - } + if (brokerService != null) { + this.scheduler = this.brokerService.getBroker().getScheduler(); + } + if (failIfJournalIsLocked) { asyncDataManager.lock(); } else { @@ -389,7 +396,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, Iterator queueIterator = queues.values().iterator(); while (queueIterator.hasNext()) { final AMQMessageStore ms = queueIterator.next(); - Location mark = (Location)ms.getMark(); + Location mark = ms.getMark(); if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } @@ -397,7 +404,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, Iterator topicIterator = topics.values().iterator(); while (topicIterator.hasNext()) { final AMQTopicMessageStore ms = topicIterator.next(); - Location mark = (Location)ms.getMark(); + Location mark = ms.getMark(); if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } @@ -726,6 +733,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, deleteAllMessages = true; } + @Override public String toString() { return "AMQPersistenceAdapter(" + directory + ")"; } @@ -754,10 +762,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, return adaptor; } - protected TaskRunnerFactory createTaskRunnerFactory() { - return DefaultThreadPools.getDefaultTaskRunnerFactory(); - } - // ///////////////////////////////////////////////////////////////// // Property Accessors // ///////////////////////////////////////////////////////////////// @@ -991,6 +995,28 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) { this.forceRecoverReferenceStore = forceRecoverReferenceStore; } + + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + + /** + * @return the journalThreadPriority + */ + public int getJournalThreadPriority() { + return this.journalThreadPriority; + } + + /** + * @param journalThreadPriority the journalThreadPriority to set + */ + public void setJournalThreadPriority(int journalThreadPriority) { + this.journalThreadPriority = journalThreadPriority; + } protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java index 98235390ba..c561823c1b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java @@ -17,7 +17,6 @@ package org.apache.activemq.store.amq; import java.io.File; - import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.store.PersistenceAdapter; @@ -35,7 +34,6 @@ import org.apache.activemq.util.IOHelper; */ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024; - private TaskRunnerFactory taskRunnerFactory; private File dataDirectory; private int journalThreadPriority = Thread.MAX_PRIORITY; private String brokerName = "localhost"; @@ -56,6 +54,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { private boolean forceRecoverReferenceStore=false; private long checkpointInterval = 1000 * 20; private boolean useDedicatedTaskRunner; + private TaskRunnerFactory taskRunnerFactory; /** @@ -82,6 +81,8 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { result.setMaxReferenceFileLength(getMaxReferenceFileLength()); result.setForceRecoverReferenceStore(isForceRecoverReferenceStore()); result.setRecoverReferenceStore(isRecoverReferenceStore()); + result.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); + result.setJournalThreadPriority(getJournalThreadPriority()); return result; } @@ -122,10 +123,6 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { * @return the taskRunnerFactory */ public TaskRunnerFactory getTaskRunnerFactory() { - if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, - true, 1000, isUseDedicatedTaskRunner()); - } return taskRunnerFactory; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 0442400fba..77cafae1bb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.JournalEventListener; @@ -64,9 +63,9 @@ import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; -import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; @@ -85,7 +84,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private BrokerService brokerService; - protected static final Scheduler scheduler = Scheduler.getInstance(); + protected Scheduler scheduler; private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); private Journal journal; @@ -97,20 +96,20 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private final ConcurrentHashMap topics = new ConcurrentHashMap(); private SystemUsage usageManager; - private long checkpointInterval = 1000 * 60 * 5; + private final long checkpointInterval = 1000 * 60 * 5; private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); private int maxCheckpointWorkers = 10; private int maxCheckpointMessageAddSize = 1024 * 1024; - private JournalTransactionStore transactionStore = new JournalTransactionStore(this); + private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); private ThreadPoolExecutor checkpointExecutor; private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private boolean fullCheckPoint; - private AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); @@ -267,7 +266,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve recover(); // Do a checkpoint periodically. - scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); + this.scheduler = new Scheduler("Journal Scheduler"); + this.scheduler.start(); + this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); } @@ -278,7 +279,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return; } - scheduler.cancel(periodicCheckpointTask); + this.scheduler.cancel(periodicCheckpointTask); + this.scheduler.stop(); // Take one final checkpoint and stop checkpoint processing. checkpoint(true, true); @@ -723,6 +725,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve longTermPersistence.setBrokerName(brokerName); } + @Override public String toString() { return "JournalPersistenceAdapator(" + longTermPersistence + ")"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 6116f621fe..3f7c711ea2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -415,6 +415,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi @Override public String toString() { - return "KahaDBPersistenceAdapter"; + String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; + return "KahaDBPersistenceAdapter[" + path +"]" ; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index ae5da098aa..8b68073747 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -72,6 +72,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; @@ -94,6 +95,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { private boolean concurrentStoreAndDispatchQueues = true; private boolean concurrentStoreAndDispatchTopics = true; private int maxAsyncJobs = MAX_ASYNC_JOBS; + private Scheduler scheduler; public KahaDBStore() { @@ -155,6 +157,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void doStart() throws Exception { + super.doStart(); this.queueSemaphore = new Semaphore(getMaxAsyncJobs()); this.topicSemaphore = new Semaphore(getMaxAsyncJobs()); this.asyncQueueJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); @@ -175,8 +178,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return thread; } }); - super.doStart(); - } @Override @@ -204,6 +205,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { protected void addQueueTask(StoreQueueTask task) throws IOException { try { this.queueSemaphore.acquire(); + } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } @@ -327,7 +329,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 1425e0be9f..fa1bd05a5b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -82,7 +82,7 @@ import org.apache.kahadb.util.VariableMarshaller; public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { - private BrokerService brokerService; + protected BrokerService brokerService; public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); @@ -245,7 +245,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // to see if we need to exit this thread. long sleepTime = Math.min(checkpointInterval, 500); while (opened.get()) { - Thread.sleep(sleepTime); long now = System.currentTimeMillis(); if( now - lastCleanup >= cleanupInterval ) { @@ -276,9 +275,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public void open() throws IOException { if( opened.compareAndSet(false, true) ) { getJournal().start(); - - loadPageFile(); - + loadPageFile(); startCheckpoint(); recover(); } @@ -332,6 +329,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public void close() throws IOException, InterruptedException { if( opened.compareAndSet(true, false)) { synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, true); + } + }); pageFile.unload(); metadata = new Metadata(); } @@ -385,11 +387,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar */ private void recover() throws IllegalStateException, IOException { synchronized (indexMutex) { - long start = System.currentTimeMillis(); - + + long start = System.currentTimeMillis(); Location recoveryPosition = getRecoveryPosition(); if( recoveryPosition!=null ) { int redoCounter = 0; + LOG.info("Recoverying from the journal ..."); while (recoveryPosition != null) { JournalCommand message = load(recoveryPosition); metadata.lastUpdate = recoveryPosition; @@ -398,7 +401,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar recoveryPosition = journal.getNextLocation(recoveryPosition); } long end = System.currentTimeMillis(); - LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } // We may have to undo some index updates. @@ -693,7 +696,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // from the recovery method too so they need to be idempotent // ///////////////////////////////////////////////////////////////// - private void process(JournalCommand data, final Location location) throws IOException { + void process(JournalCommand data, final Location location) throws IOException { data.visit(new Visitor() { @Override public void visit(KahaAddMessageCommand command) throws IOException { @@ -732,11 +735,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar }); } - private void process(final KahaAddMessageCommand command, final Location location) throws IOException { + protected void process(final KahaAddMessageCommand command, final Location location) throws IOException { if (command.hasTransactionInfo()) { synchronized (indexMutex) { ArrayList inflightTx = getInflightTx(command.getTransactionInfo(), location); inflightTx.add(new AddOpperation(command, location)); + TransactionId key = key(command.getTransactionInfo()); } } else { synchronized (indexMutex) { @@ -836,7 +840,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected final Object indexMutex = new Object(); private final HashSet journalFilesBeingReplicated = new HashSet(); - private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { + void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); // Skip adding the message to the index if this is a topic and there are @@ -870,7 +874,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } - private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { + void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); if (!command.hasSubscriptionKey()) { @@ -902,7 +906,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); sd.orderIndex.clear(tx); sd.orderIndex.unload(tx); @@ -931,7 +935,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar metadata.destinations.remove(tx, key); } - private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); // If set then we are creating it.. otherwise we are destroying the sub @@ -961,8 +965,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * @param tx * @throws IOException */ - private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { - + void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { LOG.debug("Checkpoint started."); metadata.state = OPEN_STATE; diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java index 0a25daec71..36dd96b473 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java @@ -19,32 +19,25 @@ package org.apache.activemq.thread; import java.util.HashMap; import java.util.Timer; import java.util.TimerTask; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; /** - * Singelton, references maintained by users * @version $Revision$ */ -public final class Scheduler { - - private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); - private final HashMap TIMER_TASKS = new HashMap(); - private static Scheduler instance; +public final class Scheduler extends ServiceSupport { + private final String name; + private Timer timer; + private final HashMap timerTasks = new HashMap(); - static { - instance = new Scheduler(); + public Scheduler (String name) { + this.name = name; } - - private Scheduler() { - } - - public static Scheduler getInstance() { - return instance; - } - - public synchronized void executePeriodically(final Runnable task, long period) { + + public void executePeriodically(final Runnable task, long period) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period); - TIMER_TASKS.put(task, timerTask); + timer.scheduleAtFixedRate(timerTask, period, period); + timerTasks.put(task, timerTask); } /* @@ -53,24 +46,38 @@ public final class Scheduler { */ public synchronized void schedualPeriodically(final Runnable task, long period) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.schedule(timerTask, period, period); - TIMER_TASKS.put(task, timerTask); + timer.schedule(timerTask, period, period); + timerTasks.put(task, timerTask); } public synchronized void cancel(Runnable task) { - TimerTask ticket = TIMER_TASKS.remove(task); + TimerTask ticket = timerTasks.remove(task); if (ticket != null) { ticket.cancel(); - CLOCK_DAEMON.purge();//remove cancelled TimerTasks + timer.purge();//remove cancelled TimerTasks } } - public void executeAfterDelay(final Runnable task, long redeliveryDelay) { + public synchronized void executeAfterDelay(final Runnable task, long redeliveryDelay) { TimerTask timerTask = new SchedulerTimerTask(task); - CLOCK_DAEMON.schedule(timerTask, redeliveryDelay); + timer.schedule(timerTask, redeliveryDelay); } public void shutdown() { - CLOCK_DAEMON.cancel(); + timer.cancel(); + } + + @Override + protected synchronized void doStart() throws Exception { + this.timer = new Timer(name, true); + + } + + @Override + protected synchronized void doStop(ServiceStopper stopper) throws Exception { + if (this.timer != null) { + this.timer.cancel(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java index e8e27b315a..6795c3ca2c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java @@ -18,6 +18,7 @@ package org.apache.activemq.usage; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.Service; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.kahadb.plist.PListStore; @@ -36,6 +37,7 @@ public class SystemUsage implements Service { private MemoryUsage memoryUsage; private StoreUsage storeUsage; private TempUsage tempUsage; + private ThreadPoolExecutor executor; /** * True if someone called setSendFailIfNoSpace() on this particular usage @@ -45,7 +47,7 @@ public class SystemUsage implements Service { private boolean sendFailIfNoSpace; private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet; private long sendFailIfNoSpaceAfterTimeout = 0; - + private final List children = new CopyOnWriteArrayList(); public SystemUsage() { @@ -58,14 +60,21 @@ public class SystemUsage implements Service { this.memoryUsage = new MemoryUsage(name + ":memory"); this.storeUsage = new StoreUsage(name + ":store", adapter); this.tempUsage = new TempUsage(name + ":temp", tempStore); + this.memoryUsage.setExecutor(getExecutor()); + this.storeUsage.setExecutor(getExecutor()); + this.tempUsage.setExecutor(getExecutor()); } public SystemUsage(SystemUsage parent, String name) { this.parent = parent; + this.executor = parent.getExecutor(); this.name = name; this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory"); this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store"); this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp"); + this.memoryUsage.setExecutor(getExecutor()); + this.storeUsage.setExecutor(getExecutor()); + this.tempUsage.setExecutor(getExecutor()); } public String getName() { @@ -186,6 +195,7 @@ public class SystemUsage implements Service { memoryUsage.setParent(parent.memoryUsage); } this.memoryUsage = memoryUsage; + this.memoryUsage.setExecutor(getExecutor()); } public void setStoreUsage(StoreUsage storeUsage) { @@ -199,6 +209,7 @@ public class SystemUsage implements Service { storeUsage.setParent(parent.storeUsage); } this.storeUsage = storeUsage; + this.storeUsage.setExecutor(executor); } @@ -213,5 +224,30 @@ public class SystemUsage implements Service { tempDiskUsage.setParent(parent.tempUsage); } this.tempUsage = tempDiskUsage; + this.tempUsage.setExecutor(getExecutor()); + } + + /** + * @return the executor + */ + public ThreadPoolExecutor getExecutor() { + return this.executor; + } + + /** + * @param executor + * the executor to set + */ + public void setExecutor(ThreadPoolExecutor executor) { + this.executor = executor; + if (this.memoryUsage != null) { + this.memoryUsage.setExecutor(this.executor); + } + if (this.storeUsage != null) { + this.storeUsage.setExecutor(this.executor); + } + if (this.tempUsage != null) { + this.tempUsage.setExecutor(this.executor); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 7e2aa336d5..12d10d0161 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -21,13 +21,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +38,6 @@ import org.apache.commons.logging.LogFactory; public abstract class Usage implements Service { private static final Log LOG = LogFactory.getLog(Usage.class); - private static ThreadPoolExecutor executor; protected final Object usageMutex = new Object(); protected int percentUsage; protected T parent; @@ -53,12 +47,11 @@ public abstract class Usage implements Service { private final boolean debug = LOG.isDebugEnabled(); private String name; private float usagePortion = 1.0f; - private List children = new CopyOnWriteArrayList(); + private final List children = new CopyOnWriteArrayList(); private final List callbacks = new LinkedList(); private int pollingTime = 100; - - private AtomicBoolean started=new AtomicBoolean(); - + private final AtomicBoolean started=new AtomicBoolean(); + private ThreadPoolExecutor executor; public Usage(T parent, String name, float portion) { this.parent = parent; this.usagePortion = portion; @@ -289,6 +282,7 @@ public abstract class Usage implements Service { return name; } + @Override public String toString() { return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%"; } @@ -411,18 +405,10 @@ public abstract class Usage implements Service { this.parent = parent; } - protected Executor getExecutor() { + public void setExecutor (ThreadPoolExecutor executor) { + this.executor = executor; + } + public ThreadPoolExecutor getExecutor() { return executor; } - - static { - executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Usage Async Task"); - thread.setDaemon(true); - return thread; - } - }); - } - } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java index 77d3ef841c..0a21f15752 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java @@ -16,14 +16,23 @@ */ package org.apache.activemq.broker; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.IOHelper; public class BrokerRestartTestSupport extends BrokerTestSupport { private PersistenceAdapter persistenceAdapter; + @Override protected BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); + File dir = broker.getBrokerDataDirectory(); + if (dir != null) { + IOHelper.deleteChildren(dir); + } //broker.setPersistent(false); broker.setDeleteAllMessagesOnStartup(true); persistenceAdapter = broker.getPersistenceAdapter(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index 890cb8ea72..8b3f1a6c23 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -17,13 +17,14 @@ package org.apache.activemq.bugs; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -41,9 +42,7 @@ import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import junit.framework.Test; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; @@ -121,7 +120,7 @@ public class DurableConsumerTest extends CombinationTestSupport{ } private class MessagePublisher implements Runnable{ - private boolean shouldPublish = true; + private final boolean shouldPublish = true; public void run(){ TopicConnectionFactory topicConnectionFactory = null; @@ -170,13 +169,14 @@ public class DurableConsumerTest extends CombinationTestSupport{ Thread publisherThread = new Thread(new MessagePublisher()); publisherThread.start(); - + final List list = new ArrayList(); for (int i = 0; i < 100; i++) { final int id = i; Thread thread = new Thread(new Runnable(){ public void run(){ - new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); + SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); + list.add(s); } }); thread.start(); @@ -189,6 +189,9 @@ public class DurableConsumerTest extends CombinationTestSupport{ configurePersistence(broker); broker.start(); Thread.sleep(10000); + for (SimpleTopicSubscriber s:list) { + s.closeConnection(); + } assertEquals(0, exceptions.size()); } @@ -358,6 +361,7 @@ public class DurableConsumerTest extends CombinationTestSupport{ } + @Override protected void setUp() throws Exception{ if (broker == null) { broker = createBroker(true); @@ -366,6 +370,7 @@ public class DurableConsumerTest extends CombinationTestSupport{ super.setUp(); } + @Override protected void tearDown() throws Exception{ super.tearDown(); if (broker != null) { @@ -392,11 +397,13 @@ public class DurableConsumerTest extends CombinationTestSupport{ protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{ answer.setDeleteAllMessagesOnStartup(deleteStore); KahaDBStore kaha = new KahaDBStore(); + //kaha.setConcurrentStoreAndDispatchTopics(false); File directory = new File("target/activemq-data/kahadb"); if (deleteStore) { IOHelper.deleteChildren(directory); } kaha.setDirectory(directory); + //kaha.setMaxAsyncJobs(10); answer.setPersistenceAdapter(kaha); answer.addConnector(bindAddress); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java b/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java index 8daa9ce0cf..3b4f2fd50f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java @@ -26,7 +26,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.thread.Scheduler; import org.apache.log4j.Logger; public class EmbeddedActiveMQ @@ -39,6 +38,7 @@ public class EmbeddedActiveMQ BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); BrokerService brokerService = null; + Connection connection = null; logger.info("Start..."); try @@ -49,7 +49,7 @@ public class EmbeddedActiveMQ logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........"); brokerService.start(); ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ"); - Connection connection = fac.createConnection(); + connection = fac.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("TEST.QUEUE"); MessageProducer producer = session.createProducer(queue); @@ -71,12 +71,9 @@ public class EmbeddedActiveMQ try { br.close(); - Scheduler scheduler = Scheduler.getInstance(); - scheduler.shutdown(); logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........"); - brokerService.stop(); - Scheduler.getInstance().shutdown(); - + connection.close(); + brokerService.stop(); sleep(8); logger.info(ThreadExplorer.show("Active threads after stop:")); @@ -90,7 +87,7 @@ public class EmbeddedActiveMQ logger.info("Waiting for list theads is greater then 1 ..."); int numTh = ThreadExplorer.active(); - while (numTh > 1) + while (numTh > 2) { sleep(3); numTh = ThreadExplorer.active(); diff --git a/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java index 2c1a37dd90..34e00f0661 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java @@ -18,15 +18,16 @@ package org.apache.activemq.network; import javax.jms.MessageProducer; import javax.jms.TemporaryQueue; - import org.apache.activemq.broker.BrokerService; public class DuplexNetworkTest extends SimpleNetworkTest { + @Override protected String getLocalBrokerURI() { return "org/apache/activemq/network/duplexLocalBroker.xml"; } + @Override protected BrokerService createRemoteBroker() throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("remoteBroker"); diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 4cdb73a0b2..f4ec1682be 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.network; import java.net.URI; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -30,9 +29,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicRequestor; import javax.jms.TopicSession; - import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; @@ -174,11 +171,13 @@ public class SimpleNetworkTest extends TestCase { } } + @Override protected void setUp() throws Exception { super.setUp(); doSetUp(); } + @Override protected void tearDown() throws Exception { localBroker.deleteAllMessages(); remoteBroker.deleteAllMessages(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java b/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java index 159f5bb919..03f78f7d62 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java @@ -20,10 +20,11 @@ package org.apache.activemq.usage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,6 +32,7 @@ import org.junit.Test; public class MemoryUsageTest { MemoryUsage underTest; + ThreadPoolExecutor executor; @Test public final void testPercentUsageNeedsNoThread() { @@ -46,6 +48,7 @@ public class MemoryUsageTest { public final void testAddUsageListenerStartsThread() throws Exception { int activeThreadCount = Thread.activeCount(); underTest = new MemoryUsage(); + underTest.setExecutor(executor); underTest.setLimit(10); underTest.start(); final CountDownLatch called = new CountDownLatch(1); @@ -66,12 +69,24 @@ public class MemoryUsageTest { @Before public void setUp() throws Exception { - underTest = new MemoryUsage(); + underTest = new MemoryUsage(); + this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Usage Async Task"); + thread.setDaemon(true); + return thread; + } + }); + underTest.setExecutor(this.executor); + } @After public void tearDown() { assertNotNull(underTest); underTest.stop(); + if (this.executor != null) { + this.executor.shutdownNow(); + } } }