Robert Davies 2010-05-20 12:02:10 +00:00
parent 1b6d397aab
commit 1a5ad284ed
43 changed files with 524 additions and 232 deletions

View File

@ -87,6 +87,7 @@ import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter; import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
@ -114,7 +115,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected boolean alwaysSessionAsync = true; protected boolean alwaysSessionAsync = true;
private TaskRunnerFactory sessionTaskRunner; private TaskRunnerFactory sessionTaskRunner;
private final ThreadPoolExecutor asyncConnectionThread; private final ThreadPoolExecutor executor;
// Connection state variables // Connection state variables
private final ConnectionInfo info; private final ConnectionInfo info;
@ -188,6 +189,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
protected volatile CountDownLatch transportInterruptionProcessingComplete; protected volatile CountDownLatch transportInterruptionProcessingComplete;
private long consumerFailoverRedeliveryWaitPeriod; private long consumerFailoverRedeliveryWaitPeriod;
private final Scheduler scheduler;
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -204,16 +206,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Configure a single threaded executor who's core thread can timeout if // Configure a single threaded executor who's core thread can timeout if
// idle // idle
asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport); Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
}); });
// asyncConnectionThread.allowCoreThreadTimeOut(true); // asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = CONNECTION_ID_GENERATOR.generateId();
this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId())); this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true); this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant()); this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1); this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@ -224,6 +226,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.factoryStats.addConnection(this); this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis(); this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
this.scheduler.start();
} }
protected void setUserName(String userName) { protected void setUserName(String userName) {
@ -609,6 +613,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
advisoryConsumer.dispose(); advisoryConsumer.dispose();
advisoryConsumer = null; advisoryConsumer = null;
} }
if (this.scheduler != null) {
try {
this.scheduler.stop();
} catch (Exception e) {
JMSException ex = JMSExceptionSupport.create(e);
throw ex;
}
}
long lastDeliveredSequenceId = 0; long lastDeliveredSequenceId = 0;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
@ -656,8 +668,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} finally { } finally {
try { try {
if (asyncConnectionThread != null){ if (executor != null){
asyncConnectionThread.shutdown(); executor.shutdown();
} }
}catch(Throwable e) { }catch(Throwable e) {
LOG.error("Error shutting down thread pool " + e,e); LOG.error("Error shutting down thread pool " + e,e);
@ -1719,7 +1731,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
@Override @Override
public Response processConnectionError(final ConnectionError error) throws Exception { public Response processConnectionError(final ConnectionError error) throws Exception {
asyncConnectionThread.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
onAsyncException(error.getException()); onAsyncException(error.getException());
} }
@ -1779,7 +1791,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void onClientInternalException(final Throwable error) { public void onClientInternalException(final Throwable error) {
if ( !closed.get() && !closing.get() ) { if ( !closed.get() && !closing.get() ) {
if ( this.clientInternalExceptionListener != null ) { if ( this.clientInternalExceptionListener != null ) {
asyncConnectionThread.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
ActiveMQConnection.this.clientInternalExceptionListener.onException(error); ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
} }
@ -1804,7 +1816,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
final JMSException e = (JMSException)error; final JMSException e = (JMSException)error;
asyncConnectionThread.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
ActiveMQConnection.this.exceptionListener.onException(e); ActiveMQConnection.this.exceptionListener.onException(e);
} }
@ -1819,7 +1831,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void onException(final IOException error) { public void onException(final IOException error) {
onAsyncException(error); onAsyncException(error);
if (!closing.get() && !closed.get()) { if (!closing.get() && !closed.get()) {
asyncConnectionThread.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
transportFailed(error); transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport); ServiceSupport.dispose(ActiveMQConnection.this.transport);
@ -2297,4 +2309,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public long getConsumerFailoverRedeliveryWaitPeriod() { public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod; return consumerFailoverRedeliveryWaitPeriod;
} }
protected Scheduler getScheduler() {
return this.scheduler;
}
protected ThreadPoolExecutor getExecutor() {
return this.executor;
}
} }

View File

@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -37,7 +36,6 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -111,7 +109,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
protected static final Scheduler scheduler = Scheduler.getInstance(); protected final Scheduler scheduler;
protected final ActiveMQSession session; protected final ActiveMQSession session;
protected final ConsumerInfo info; protected final ConsumerInfo info;
@ -130,17 +128,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private int ackCounter; private int ackCounter;
private int dispatchedCount; private int dispatchedCount;
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
private JMSConsumerStatsImpl stats; private final JMSConsumerStatsImpl stats;
private final String selector; private final String selector;
private boolean synchronizationRegistered; private boolean synchronizationRegistered;
private AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private MessageAvailableListener availableListener; private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy; private RedeliveryPolicy redeliveryPolicy;
private boolean optimizeAcknowledge; private boolean optimizeAcknowledge;
private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService; private ExecutorService executorService;
private MessageTransformer transformer; private MessageTransformer transformer;
private boolean clearDispatchList; private boolean clearDispatchList;
@ -152,7 +150,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private IOException failureError; private IOException failureError;
private long optimizeAckTimestamp = System.currentTimeMillis(); private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAckTimeout = 300; private final long optimizeAckTimeout = 300;
private long failoverRedeliveryWaitPeriod = 0; private long failoverRedeliveryWaitPeriod = 0;
/** /**
@ -202,6 +200,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
this.session = session; this.session = session;
this.scheduler = session.getScheduler();
this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
setTransformer(session.getTransformer()); setTransformer(session.getTransformer());
@ -634,10 +633,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (session.getTransactionContext().isInTransaction()) { if (session.getTransactionContext().isInTransaction()) {
session.getTransactionContext().addSynchronization(new Synchronization() { session.getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
doClose(); doClose();
} }
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
doClose(); doClose();
} }
@ -912,16 +913,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!synchronizationRegistered) { if (!synchronizationRegistered) {
synchronizationRegistered = true; synchronizationRegistered = true;
session.getTransactionContext().addSynchronization(new Synchronization() { session.getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception { public void beforeEnd() throws Exception {
acknowledge(); acknowledge();
synchronizationRegistered = false; synchronizationRegistered = false;
} }
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
commit(); commit();
synchronizationRegistered = false; synchronizationRegistered = false;
} }
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
rollback(); rollback();
synchronizationRegistered = false; synchronizationRegistered = false;
@ -1325,6 +1329,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
unconsumedMessages.stop(); unconsumedMessages.stop();
} }
@Override
public String toString() { public String toString() {
return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
+ " }"; + " }";

View File

@ -19,13 +19,11 @@ package org.apache.activemq;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
@ -73,9 +71,9 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
protected ProducerInfo info; protected ProducerInfo info;
protected boolean closed; protected boolean closed;
private JMSProducerStatsImpl stats; private final JMSProducerStatsImpl stats;
private AtomicLong messageSequence; private AtomicLong messageSequence;
private long startTime; private final long startTime;
private MessageTransformer transformer; private MessageTransformer transformer;
private MemoryUsage producerWindow; private MemoryUsage producerWindow;
@ -93,6 +91,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
// size > 0 // size > 0
if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
producerWindow = new MemoryUsage("Producer Window: " + producerId); producerWindow = new MemoryUsage("Producer Window: " + producerId);
producerWindow.setExecutor(session.getConnectionExecutor());
producerWindow.setLimit(this.info.getWindowSize()); producerWindow.setLimit(this.info.getWindowSize());
producerWindow.start(); producerWindow.start();
} }
@ -164,6 +163,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
* *
* @throws IllegalStateException * @throws IllegalStateException
*/ */
@Override
protected void checkClosed() throws IllegalStateException { protected void checkClosed() throws IllegalStateException {
if (closed) { if (closed) {
throw new IllegalStateException("The producer is closed"); throw new IllegalStateException("The producer is closed");
@ -280,6 +280,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
this.info = info; this.info = info;
} }
@Override
public String toString() { public String toString() {
return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
} }

View File

@ -24,8 +24,8 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
@ -53,7 +53,6 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackException;
import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader; import org.apache.activemq.blob.BlobUploader;
@ -198,7 +197,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
private static final Log LOG = LogFactory.getLog(ActiveMQSession.class); private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
protected static final Scheduler scheduler = Scheduler.getInstance(); private final Scheduler scheduler;
private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode; protected int acknowledgementMode;
protected final ActiveMQConnection connection; protected final ActiveMQConnection connection;
@ -220,7 +220,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected Object sendMutex = new Object(); protected Object sendMutex = new Object();
private MessageListener messageListener; private MessageListener messageListener;
private JMSSessionStatsImpl stats; private final JMSSessionStatsImpl stats;
private TransactionContext transactionContext; private TransactionContext transactionContext;
private DeliveryListener deliveryListener; private DeliveryListener deliveryListener;
private MessageTransformer transformer; private MessageTransformer transformer;
@ -251,7 +251,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.connection.asyncSendPacket(info); this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer()); setTransformer(connection.getTransformer());
setBlobTransferPolicy(connection.getBlobTransferPolicy()); setBlobTransferPolicy(connection.getBlobTransferPolicy());
this.scheduler=connection.getScheduler();
this.connectionExecutor=connection.getExecutor();
if (connection.isStarted()) { if (connection.isStarted()) {
start(); start();
} }
@ -613,11 +614,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
synchronizationRegistered = true; synchronizationRegistered = true;
getTransactionContext().addSynchronization(new Synchronization() { getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
doClose(); doClose();
synchronizationRegistered = false; synchronizationRegistered = false;
} }
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
doClose(); doClose();
synchronizationRegistered = false; synchronizationRegistered = false;
@ -846,6 +849,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (ack.getTransactionId() != null) { if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() { getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
md.getMessage().onMessageRolledBack(); md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate // ensure we don't filter this as a duplicate
@ -1947,6 +1951,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
return executor.getUnconsumedMessages(); return executor.getUnconsumedMessages();
} }
@Override
public String toString() { public String toString() {
return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
} }
@ -2025,4 +2030,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
syncSendPacket(ack); syncSendPacket(ack);
} }
} }
protected Scheduler getScheduler() {
return this.scheduler;
}
protected ThreadPoolExecutor getConnectionExecutor() {
return this.connectionExecutor;
}
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker;
import java.net.URI; import java.net.URI;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -33,6 +34,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -373,5 +375,9 @@ public interface Broker extends Region, Service {
*/ */
void nowMasterBroker(); void nowMasterBroker();
Scheduler getScheduler();
ThreadPoolExecutor getExecutor();
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -40,6 +41,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -300,4 +302,12 @@ public class BrokerFilter implements Broker {
ConsumerControl control) { ConsumerControl control) {
next.processConsumerControl(consumerExchange, control); next.processConsumerControl(consumerExchange, control);
} }
public Scheduler getScheduler() {
return next.getScheduler();
}
public ThreadPoolExecutor getExecutor() {
return next.getExecutor();
}
} }

View File

@ -29,6 +29,9 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -78,9 +81,10 @@ import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory; import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
@ -188,7 +192,8 @@ public class BrokerService implements Service {
private IOExceptionHandler ioExceptionHandler; private IOExceptionHandler ioExceptionHandler;
private boolean schedulerSupport = true; private boolean schedulerSupport = true;
private File schedulerDirectoryFile; private File schedulerDirectoryFile;
private Scheduler scheduler;
private ThreadPoolExecutor executor;
private boolean slave = true; private boolean slave = true;
static { static {
@ -589,6 +594,15 @@ public class BrokerService implements Service {
} }
} }
} }
if (this.taskRunnerFactory != null) {
this.taskRunnerFactory.shutdown();
}
if (this.scheduler != null) {
this.scheduler.stop();
}
if (this.executor != null) {
this.executor.shutdownNow();
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
synchronized (shutdownHooks) { synchronized (shutdownHooks) {
for (Runnable hook : shutdownHooks) { for (Runnable hook : shutdownHooks) {
@ -756,9 +770,6 @@ public class BrokerService implements Service {
} }
public PersistenceAdapterFactory getPersistenceFactory() { public PersistenceAdapterFactory getPersistenceFactory() {
if (persistenceFactory == null) {
persistenceFactory = createPersistenceFactory();
}
return persistenceFactory; return persistenceFactory;
} }
@ -848,6 +859,7 @@ public class BrokerService implements Service {
try { try {
if (systemUsage == null) { if (systemUsage == null) {
systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
systemUsage.setExecutor(getExecutor());
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
// 64 // 64
// Meg // Meg
@ -869,6 +881,9 @@ public class BrokerService implements Service {
removeService(this.systemUsage); removeService(this.systemUsage);
} }
this.systemUsage = memoryManager; this.systemUsage = memoryManager;
if (this.systemUsage.getExecutor()==null) {
this.systemUsage.setExecutor(getExecutor());
}
addService(this.systemUsage); addService(this.systemUsage);
} }
@ -953,11 +968,11 @@ public class BrokerService implements Service {
} }
public TaskRunnerFactory getTaskRunnerFactory() { public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) { if (this.taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, this.taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
isDedicatedTaskRunner()); isDedicatedTaskRunner());
} }
return taskRunnerFactory; return this.taskRunnerFactory;
} }
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
@ -1769,10 +1784,10 @@ public class BrokerService implements Service {
RegionBroker regionBroker; RegionBroker regionBroker;
if (isUseJmx()) { if (isUseJmx()) {
regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
} else { } else {
regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
destinationInterceptor); destinationInterceptor,getScheduler(),getExecutor());
} }
destinationFactory.setRegionBroker(regionBroker); destinationFactory.setRegionBroker(regionBroker);
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
@ -1850,20 +1865,20 @@ public class BrokerService implements Service {
protected PersistenceAdapter createPersistenceAdapter() throws IOException { protected PersistenceAdapter createPersistenceAdapter() throws IOException {
if (isPersistent()) { if (isPersistent()) {
return getPersistenceFactory().createPersistenceAdapter(); PersistenceAdapterFactory fac = getPersistenceFactory();
if (fac != null) {
return fac.createPersistenceAdapter();
}else {
KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
File dir = new File(getBrokerDataDirectory(),"KahaDB");
adaptor.setDirectory(dir);
return adaptor;
}
} else { } else {
return new MemoryPersistenceAdapter(); return new MemoryPersistenceAdapter();
} }
} }
protected AMQPersistenceAdapterFactory createPersistenceFactory() {
AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
factory.setDataDirectory(getBrokerDataDirectory());
factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
factory.setBrokerName(getBrokerName());
return factory;
}
protected ObjectName createBrokerObjectName() throws IOException { protected ObjectName createBrokerObjectName() throws IOException {
try { try {
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
@ -2125,6 +2140,31 @@ public class BrokerService implements Service {
} }
} }
protected synchronized ThreadPoolExecutor getExecutor() {
if (this.executor == null) {
this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Usage Async Task");
thread.setDaemon(true);
return thread;
}
});
}
return this.executor;
}
protected synchronized Scheduler getScheduler() {
if (this.scheduler==null) {
this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
try {
this.scheduler.start();
} catch (Exception e) {
LOG.error("Failed to start Scheduler ",e);
}
}
return this.scheduler;
}
public Broker getRegionBroker() { public Broker getRegionBroker() {
return regionBroker; return regionBroker;
} }
@ -2252,6 +2292,4 @@ public class BrokerService implements Service {
public void setSchedulerDirectory(String schedulerDirectory) { public void setSchedulerDirectory(String schedulerDirectory) {
setSchedulerDirectoryFile(new File(schedulerDirectory)); setSchedulerDirectoryFile(new File(schedulerDirectory));
} }
} }

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -41,6 +42,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -283,4 +285,12 @@ public class EmptyBroker implements Broker {
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) { ConsumerControl control) {
} }
public Scheduler getScheduler() {
return null;
}
public ThreadPoolExecutor getExecutor() {
return null;
}
} }

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -41,6 +42,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -302,4 +304,12 @@ public class ErrorBroker implements Broker {
ConsumerControl control) { ConsumerControl control) {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public Scheduler getScheduler() {
throw new BrokerStoppedException(this.message);
}
public ThreadPoolExecutor getExecutor() {
throw new BrokerStoppedException(this.message);
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -41,6 +42,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -312,4 +314,12 @@ public class MutableBrokerFilter implements Broker {
getNext().processConsumerControl(consumerExchange, control); getNext().processConsumerControl(consumerExchange, control);
} }
public Scheduler getScheduler() {
return getNext().getScheduler();
}
public ThreadPoolExecutor getExecutor() {
return getNext().getExecutor();
}
} }

View File

@ -16,6 +16,28 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -41,6 +63,7 @@ import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.JMXSupport;
@ -48,27 +71,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
public class ManagedRegionBroker extends RegionBroker { public class ManagedRegionBroker extends RegionBroker {
private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class); private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
@ -91,18 +93,20 @@ public class ManagedRegionBroker extends RegionBroker {
private Broker contextBroker; private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor); super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
this.managementContext = context; this.managementContext = context;
this.brokerObjectName = brokerObjectName; this.brokerObjectName = brokerObjectName;
} }
@Override
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
// build all existing durable subscriptions // build all existing durable subscriptions
buildExistingSubscriptions(); buildExistingSubscriptions();
} }
@Override
protected void doStop(ServiceStopper stopper) { protected void doStop(ServiceStopper stopper) {
super.doStop(stopper); super.doStop(stopper);
// lets remove any mbeans not yet removed // lets remove any mbeans not yet removed
@ -119,18 +123,22 @@ public class ManagedRegionBroker extends RegionBroker {
registeredMBeans.clear(); registeredMBeans.clear();
} }
@Override
protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
} }
@Override
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
} }
@Override
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
} }
@Override
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
} }

View File

@ -17,9 +17,7 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import javax.jms.ResourceAllocationException; import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;

View File

@ -55,6 +55,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
this.persistenceAdapter = persistenceAdapter; this.persistenceAdapter = persistenceAdapter;
} }
@Override
public void setRegionBroker(RegionBroker broker) { public void setRegionBroker(RegionBroker broker) {
if (broker == null) { if (broker == null) {
throw new IllegalArgumentException("null broker"); throw new IllegalArgumentException("null broker");
@ -62,6 +63,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
this.broker = broker; this.broker = broker;
} }
@Override
public Set<ActiveMQDestination> getDestinations() { public Set<ActiveMQDestination> getDestinations() {
return persistenceAdapter.getDestinations(); return persistenceAdapter.getDestinations();
} }
@ -69,6 +71,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
/** /**
* @return instance of {@link Queue} or {@link Topic} * @return instance of {@link Queue} or {@link Topic}
*/ */
@Override
public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
if (destination.isQueue()) { if (destination.isQueue()) {
if (destination.isTemporary()) { if (destination.isTemporary()) {
@ -100,6 +103,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
} }
} }
@Override
public void removeDestination(Destination dest) { public void removeDestination(Destination dest) {
ActiveMQDestination destination = dest.getActiveMQDestination(); ActiveMQDestination destination = dest.getActiveMQDestination();
if (!destination.isTemporary()) { if (!destination.isTemporary()) {
@ -131,11 +135,12 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (broker.getDestinationPolicy() != null) { if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) { if (entry != null) {
entry.configure(topic); entry.configure(broker,topic);
} }
} }
} }
@Override
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException {
return persistenceAdapter.getLastMessageBrokerSequenceId(); return persistenceAdapter.getLastMessageBrokerSequenceId();
} }
@ -144,6 +149,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
return persistenceAdapter; return persistenceAdapter;
} }
@Override
public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
} }

View File

@ -23,10 +23,8 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -55,7 +53,7 @@ import org.apache.commons.logging.LogFactory;
public abstract class PrefetchSubscription extends AbstractSubscription { public abstract class PrefetchSubscription extends AbstractSubscription {
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class); private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
protected static final Scheduler scheduler = Scheduler.getInstance(); protected final Scheduler scheduler;
protected PendingMessageCursor pending; protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>(); protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
@ -70,12 +68,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
private final Object pendingLock = new Object(); private final Object pendingLock = new Object();
private final Object dispatchLock = new Object(); private final Object dispatchLock = new Object();
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,context, info); super(broker,context, info);
this.usageManager=usageManager; this.usageManager=usageManager;
pending = cursor; pending = cursor;
this.scheduler = broker.getScheduler();
} }
public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@ -230,6 +229,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
context.getTransaction().addSynchronization( context.getTransaction().addSynchronization(
new Synchronization() { new Synchronization() {
@Override
public void afterCommit() public void afterCommit()
throws Exception { throws Exception {
synchronized(dispatchLock) { synchronized(dispatchLock) {
@ -239,6 +239,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} }
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
synchronized(dispatchLock) { synchronized(dispatchLock) {
if (isSlave()) { if (isSlave()) {
@ -486,6 +487,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
} }
@Override
public int countBeforeFull() { public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size(); return info.getPrefetchSize() + prefetchExtension - dispatched.size();
} }
@ -510,6 +512,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
return enqueueCounter; return enqueueCounter;
} }
@Override
public boolean isRecoveryRequired() { public boolean isRecoveryRequired() {
return pending.isRecoveryRequired(); return pending.isRecoveryRequired();
} }
@ -526,13 +529,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} }
public void add(ConnectionContext context, Destination destination) throws Exception { @Override
public void add(ConnectionContext context, Destination destination) throws Exception {
synchronized(pendingLock) { synchronized(pendingLock) {
super.add(context, destination); super.add(context, destination);
pending.add(context, destination); pending.add(context, destination);
} }
} }
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>(); List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) { synchronized(pendingLock) {
@ -546,7 +551,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(dispatchLock) { synchronized(dispatchLock) {
for (MessageReference r : dispatched) { for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) { if( r.getRegionDestination() == destination) {
rc.add((QueueMessageReference)r); rc.add(r);
} }
} }
destination.getDestinationStatistics().getDispatched().subtract(dispatched.size()); destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());

View File

@ -125,7 +125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}; };
private final Object iteratingMutex = new Object() {}; private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance(); private final Scheduler scheduler;
class TimeoutMessage implements Delayed { class TimeoutMessage implements Delayed {
@ -203,6 +203,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
super(brokerService, store, destination, parentStats); super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory; this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination); this.dispatchSelector = new QueueDispatchSelector(destination);
this.scheduler = brokerService.getBroker().getScheduler();
} }
public List<Subscription> getConsumers() { public List<Subscription> getConsumers() {

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -57,6 +58,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
@ -98,10 +100,14 @@ public class RegionBroker extends EmptyBroker {
private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
private final DestinationInterceptor destinationInterceptor; private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext; private ConnectionContext adminConnectionContext;
private final Scheduler scheduler;
private final ThreadPoolExecutor executor;
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor) throws IOException { DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
this.brokerService = brokerService; this.brokerService = brokerService;
this.executor=executor;
this.scheduler = scheduler;
if (destinationFactory == null) { if (destinationFactory == null) {
throw new IllegalArgumentException("null destinationFactory"); throw new IllegalArgumentException("null destinationFactory");
} }
@ -810,6 +816,16 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public Scheduler getScheduler() {
return this.scheduler;
}
public ThreadPoolExecutor getExecutor() {
return this.executor;
}
@Override @Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
ActiveMQDestination destination = control.getDestination(); ActiveMQDestination destination = control.getDestination();

View File

@ -231,7 +231,7 @@ public class TopicRegion extends AbstractRegion {
if (broker.getDestinationPolicy() != null) { if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) { if (entry != null) {
entry.configure(topic); entry.configure(broker,topic);
} }
} }
} }

View File

@ -5,7 +5,6 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -24,15 +23,19 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class); private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
private static final Scheduler scheduler = Scheduler.getInstance(); private Scheduler scheduler;
private AtomicBoolean taskStarted = new AtomicBoolean(false); private final AtomicBoolean taskStarted = new AtomicBoolean(false);
private Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>(); private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
private long maxSlowCount = -1; private long maxSlowCount = -1;
private long maxSlowDuration = 30*1000; private long maxSlowDuration = 30*1000;
private long checkPeriod = 30*1000; private long checkPeriod = 30*1000;
private boolean abortConnection = false; private boolean abortConnection = false;
public void setScheduler(Scheduler s) {
this.scheduler=s;
}
public void slowConsumer(ConnectionContext context, Subscription subs) { public void slowConsumer(ConnectionContext context, Subscription subs) {
if (maxSlowCount < 0 && maxSlowDuration < 0) { if (maxSlowCount < 0 && maxSlowDuration < 0) {
// nothing to do // nothing to do

View File

@ -18,7 +18,7 @@ package org.apache.activemq.broker.region.policy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -118,4 +118,7 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover
return result.toArray(new Message[result.size()]); return result.toArray(new Message[result.size()]);
} }
public void setBroker(Broker broker) {
}
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -110,6 +111,9 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover
return buffer.browse(destination); return buffer.browse(destination);
} }
public void setBroker(Broker broker) {
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -69,4 +70,7 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery
return new LastImageSubscriptionRecoveryPolicy(); return new LastImageSubscriptionRecoveryPolicy();
} }
public void setBroker(Broker broker) {
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -53,4 +54,7 @@ public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy
return new Message[0]; return new Message[0];
} }
public void setBroker(Broker broker) {
}
} }

View File

@ -90,7 +90,7 @@ public class PolicyEntry extends DestinationMapEntry {
public void configure(Broker broker,Queue queue) { public void configure(Broker broker,Queue queue) {
baseConfiguration(queue); baseConfiguration(broker,queue);
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy); queue.setDispatchPolicy(dispatchPolicy);
} }
@ -112,14 +112,16 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
} }
public void configure(Topic topic) { public void configure(Broker broker,Topic topic) {
baseConfiguration(topic); baseConfiguration(broker,topic);
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy); topic.setDispatchPolicy(dispatchPolicy);
} }
topic.setDeadLetterStrategy(getDeadLetterStrategy()); topic.setDeadLetterStrategy(getDeadLetterStrategy());
if (subscriptionRecoveryPolicy != null) { if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy()); SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
srp.setBroker(broker);
topic.setSubscriptionRecoveryPolicy(srp);
} }
if (memoryLimit > 0) { if (memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit); topic.getMemoryUsage().setLimit(memoryLimit);
@ -127,7 +129,7 @@ public class PolicyEntry extends DestinationMapEntry {
topic.setLazyDispatch(isLazyDispatch()); topic.setLazyDispatch(isLazyDispatch());
} }
public void baseConfiguration(BaseDestination destination) { public void baseConfiguration(Broker broker,BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl()); destination.setProducerFlowControl(isProducerFlowControl());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit()); destination.setEnableAudit(isEnableAudit());
@ -148,7 +150,11 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setMaxExpirePageSize(getMaxExpirePageSize()); destination.setMaxExpirePageSize(getMaxExpirePageSize());
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
destination.setSlowConsumerStrategy(getSlowConsumerStrategy()); SlowConsumerStrategy scs = getSlowConsumerStrategy();
if (scs != null) {
scs.setScheduler(broker.getScheduler());
}
destination.setSlowConsumerStrategy(scs);
} }
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {

View File

@ -17,12 +17,11 @@
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQMessageTransformation; import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -50,9 +49,9 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class); private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
private MessageQuery query; private MessageQuery query;
private AtomicLong messageSequence = new AtomicLong(0); private final AtomicLong messageSequence = new AtomicLong(0);
private IdGenerator idGenerator = new IdGenerator(); private final IdGenerator idGenerator = new IdGenerator();
private ProducerId producerId = createProducerId(); private final ProducerId producerId = createProducerId();
public SubscriptionRecoveryPolicy copy() { public SubscriptionRecoveryPolicy copy() {
QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
@ -100,6 +99,9 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
return new org.apache.activemq.command.Message[0]; return new org.apache.activemq.command.Message[0];
} }
public void setBroker(Broker broker) {
}
protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
try { try {
ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);

View File

@ -2,6 +2,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.thread.Scheduler;
/* /*
* a strategy for dealing with slow consumers * a strategy for dealing with slow consumers
@ -9,5 +10,6 @@ import org.apache.activemq.broker.region.Subscription;
public interface SlowConsumerStrategy { public interface SlowConsumerStrategy {
void slowConsumer(ConnectionContext context, Subscription subs); void slowConsumer(ConnectionContext context, Subscription subs);
void setScheduler(Scheduler scheduler);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -69,4 +70,6 @@ public interface SubscriptionRecoveryPolicy extends Service {
* @return the copy * @return the copy
*/ */
SubscriptionRecoveryPolicy copy(); SubscriptionRecoveryPolicy copy();
void setBroker(Broker broker);
} }

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.SubscriptionRecovery;
@ -28,7 +29,6 @@ import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
/** /**
@ -42,7 +42,7 @@ import org.apache.activemq.thread.Scheduler;
public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
private static final int GC_INTERVAL = 1000; private static final int GC_INTERVAL = 1000;
protected static final Scheduler scheduler = Scheduler.getInstance(); private Scheduler scheduler;
// TODO: need to get a better synchronized linked list that has little // TODO: need to get a better synchronized linked list that has little
// contention between enqueuing and dequeuing // contention between enqueuing and dequeuing
@ -90,6 +90,10 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
} }
} }
public void setBroker(Broker broker) {
this.scheduler = broker.getScheduler();
}
public void start() throws Exception { public void start() throws Exception {
scheduler.executePeriodically(gcTask, GC_INTERVAL); scheduler.executePeriodically(gcTask, GC_INTERVAL);
} }
@ -98,6 +102,7 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
scheduler.cancel(gcTask); scheduler.cancel(gcTask);
} }
public void gc() { public void gc() {
lastGCRun = System.currentTimeMillis(); lastGCRun = System.currentTimeMillis();
while (buffer.size() > 0) { while (buffer.size() > 0) {

View File

@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
@ -75,7 +74,7 @@ public class AsyncDataManager {
public static final int PREFERED_DIFF = 1024 * 512; public static final int PREFERED_DIFF = 1024 * 512;
private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
protected static Scheduler scheduler = Scheduler.getInstance(); protected Scheduler scheduler;
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@ -193,7 +192,13 @@ public class AsyncDataManager {
cleanup(); cleanup();
} }
}; };
scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); this.scheduler = new Scheduler("AsyncDataManager Scheduler");
try {
this.scheduler.start();
} catch (Exception e) {
throw new IOException(e);
}
this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
} }
public void lock() throws IOException { public void lock() throws IOException {
@ -328,7 +333,12 @@ public class AsyncDataManager {
if (!started) { if (!started) {
return; return;
} }
scheduler.cancel(cleanupTask); this.scheduler.cancel(cleanupTask);
try {
this.scheduler.stop();
} catch (Exception e) {
throw new IOException(e);
}
accessorPool.close(); accessorPool.close();
storeState(false); storeState(false);
appender.close(); appender.close();
@ -376,7 +386,7 @@ public class AsyncDataManager {
public synchronized void addInterestInFile(int file) throws IOException { public synchronized void addInterestInFile(int file) throws IOException {
if (file >= 0) { if (file >= 0) {
Integer key = Integer.valueOf(file); Integer key = Integer.valueOf(file);
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = fileMap.get(key);
if (dataFile == null) { if (dataFile == null) {
throw new IOException("That data file does not exist"); throw new IOException("That data file does not exist");
} }
@ -393,7 +403,7 @@ public class AsyncDataManager {
public synchronized void removeInterestInFile(int file) throws IOException { public synchronized void removeInterestInFile(int file) throws IOException {
if (file >= 0) { if (file >= 0) {
Integer key = Integer.valueOf(file); Integer key = Integer.valueOf(file);
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = fileMap.get(key);
removeInterestInFile(dataFile); removeInterestInFile(dataFile);
} }
@ -414,7 +424,7 @@ public class AsyncDataManager {
List<DataFile> purgeList = new ArrayList<DataFile>(); List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) { for (Integer key : unUsed) {
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = fileMap.get(key);
purgeList.add(dataFile); purgeList.add(dataFile);
} }
for (DataFile dataFile : purgeList) { for (DataFile dataFile : purgeList) {
@ -432,7 +442,7 @@ public class AsyncDataManager {
for (Integer key : unUsed) { for (Integer key : unUsed) {
// Only add files less than the lastFile.. // Only add files less than the lastFile..
if( key.intValue() < lastFile.intValue() ) { if( key.intValue() < lastFile.intValue() ) {
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = fileMap.get(key);
purgeList.add(dataFile); purgeList.add(dataFile);
} }
} }
@ -499,6 +509,7 @@ public class AsyncDataManager {
this.maxFileLength = maxFileLength; this.maxFileLength = maxFileLength;
} }
@Override
public String toString() { public String toString() {
return "DataManager:(" + filePrefix + ")"; return "DataManager:(" + filePrefix + ")";
} }

View File

@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
@ -58,7 +57,6 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
@ -85,7 +83,7 @@ import org.apache.commons.logging.LogFactory;
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
private static final Scheduler scheduler = Scheduler.getInstance(); private Scheduler scheduler;
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>(); private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>(); private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
@ -99,7 +97,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private SystemUsage usageManager; private SystemUsage usageManager;
private long checkpointInterval = 1000 * 20; private long checkpointInterval = 1000 * 20;
private int maxCheckpointMessageAddSize = 1024 * 4; private int maxCheckpointMessageAddSize = 1024 * 4;
private AMQTransactionStore transactionStore = new AMQTransactionStore(this); private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
private TaskRunner checkpointTask; private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
@ -112,7 +110,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private File directory; private File directory;
private File directoryArchive; private File directoryArchive;
private BrokerService brokerService; private BrokerService brokerService;
private AtomicLong storeSize = new AtomicLong(); private final AtomicLong storeSize = new AtomicLong();
private boolean persistentIndex=true; private boolean persistentIndex=true;
private boolean useNio = true; private boolean useNio = true;
private boolean archiveDataLogs=false; private boolean archiveDataLogs=false;
@ -124,8 +122,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> (); private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
private String directoryPath = "";
private RandomAccessFile lockFile; private RandomAccessFile lockFile;
private FileLock lock; private FileLock lock;
private boolean disableLocking = DISABLE_LOCKING; private boolean disableLocking = DISABLE_LOCKING;
@ -134,6 +131,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private boolean lockAquired; private boolean lockAquired;
private boolean recoverReferenceStore=true; private boolean recoverReferenceStore=true;
private boolean forceRecoverReferenceStore=false; private boolean forceRecoverReferenceStore=false;
private boolean useDedicatedTaskRunner=false;
private int journalThreadPriority = Thread.MAX_PRIORITY;
public String getBrokerName() { public String getBrokerName() {
return this.brokerName; return this.brokerName;
@ -165,12 +164,19 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
} else { } else {
this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
this.directory = new File(directory, "amqstore"); this.directory = new File(directory, "amqstore");
this.directoryPath=directory.getAbsolutePath(); directory.getAbsolutePath();
} }
} }
if (this.directoryArchive == null) { if (this.directoryArchive == null) {
this.directoryArchive = new File(this.directory,"archive"); this.directoryArchive = new File(this.directory,"archive");
} }
if (this.brokerService != null) {
this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
}else {
this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
}
this.taskRunnerFactory= new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
true, 1000, isUseDedicatedTaskRunner());
IOHelper.mkdirs(this.directory); IOHelper.mkdirs(this.directory);
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock(); lock();
@ -192,8 +198,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
referenceStoreAdapter.setBrokerName(getBrokerName()); referenceStoreAdapter.setBrokerName(getBrokerName());
referenceStoreAdapter.setUsageManager(usageManager); referenceStoreAdapter.setUsageManager(usageManager);
referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
if (taskRunnerFactory == null) {
taskRunnerFactory = createTaskRunnerFactory(); if (brokerService != null) {
this.scheduler = this.brokerService.getBroker().getScheduler();
} }
if (failIfJournalIsLocked) { if (failIfJournalIsLocked) {
@ -389,7 +396,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) { while (queueIterator.hasNext()) {
final AMQMessageStore ms = queueIterator.next(); final AMQMessageStore ms = queueIterator.next();
Location mark = (Location)ms.getMark(); Location mark = ms.getMark();
if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark; newMark = mark;
} }
@ -397,7 +404,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
while (topicIterator.hasNext()) { while (topicIterator.hasNext()) {
final AMQTopicMessageStore ms = topicIterator.next(); final AMQTopicMessageStore ms = topicIterator.next();
Location mark = (Location)ms.getMark(); Location mark = ms.getMark();
if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark; newMark = mark;
} }
@ -726,6 +733,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
deleteAllMessages = true; deleteAllMessages = true;
} }
@Override
public String toString() { public String toString() {
return "AMQPersistenceAdapter(" + directory + ")"; return "AMQPersistenceAdapter(" + directory + ")";
} }
@ -754,10 +762,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return adaptor; return adaptor;
} }
protected TaskRunnerFactory createTaskRunnerFactory() {
return DefaultThreadPools.getDefaultTaskRunnerFactory();
}
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
// Property Accessors // Property Accessors
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
@ -992,6 +996,28 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.forceRecoverReferenceStore = forceRecoverReferenceStore; this.forceRecoverReferenceStore = forceRecoverReferenceStore;
} }
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
/**
* @return the journalThreadPriority
*/
public int getJournalThreadPriority() {
return this.journalThreadPriority;
}
/**
* @param journalThreadPriority the journalThreadPriority to set
*/
public void setJournalThreadPriority(int journalThreadPriority) {
this.journalThreadPriority = journalThreadPriority;
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store.amq; package org.apache.activemq.store.amq;
import java.io.File; import java.io.File;
import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -35,7 +34,6 @@ import org.apache.activemq.util.IOHelper;
*/ */
public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024; static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024;
private TaskRunnerFactory taskRunnerFactory;
private File dataDirectory; private File dataDirectory;
private int journalThreadPriority = Thread.MAX_PRIORITY; private int journalThreadPriority = Thread.MAX_PRIORITY;
private String brokerName = "localhost"; private String brokerName = "localhost";
@ -56,6 +54,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
private boolean forceRecoverReferenceStore=false; private boolean forceRecoverReferenceStore=false;
private long checkpointInterval = 1000 * 20; private long checkpointInterval = 1000 * 20;
private boolean useDedicatedTaskRunner; private boolean useDedicatedTaskRunner;
private TaskRunnerFactory taskRunnerFactory;
/** /**
@ -82,6 +81,8 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
result.setMaxReferenceFileLength(getMaxReferenceFileLength()); result.setMaxReferenceFileLength(getMaxReferenceFileLength());
result.setForceRecoverReferenceStore(isForceRecoverReferenceStore()); result.setForceRecoverReferenceStore(isForceRecoverReferenceStore());
result.setRecoverReferenceStore(isRecoverReferenceStore()); result.setRecoverReferenceStore(isRecoverReferenceStore());
result.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
result.setJournalThreadPriority(getJournalThreadPriority());
return result; return result;
} }
@ -122,10 +123,6 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
* @return the taskRunnerFactory * @return the taskRunnerFactory
*/ */
public TaskRunnerFactory getTaskRunnerFactory() { public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
true, 1000, isUseDedicatedTaskRunner());
}
return taskRunnerFactory; return taskRunnerFactory;
} }

View File

@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener; import org.apache.activeio.journal.JournalEventListener;
@ -64,9 +63,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener; import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
@ -85,7 +84,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private BrokerService brokerService; private BrokerService brokerService;
protected static final Scheduler scheduler = Scheduler.getInstance(); protected Scheduler scheduler;
private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
private Journal journal; private Journal journal;
@ -97,20 +96,20 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>(); private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private SystemUsage usageManager; private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60 * 5; private final long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10; private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024 * 1024; private int maxCheckpointMessageAddSize = 1024 * 1024;
private JournalTransactionStore transactionStore = new JournalTransactionStore(this); private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor; private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask; private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private boolean fullCheckPoint; private boolean fullCheckPoint;
private AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
@ -267,7 +266,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
recover(); recover();
// Do a checkpoint periodically. // Do a checkpoint periodically.
scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); this.scheduler = new Scheduler("Journal Scheduler");
this.scheduler.start();
this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
} }
@ -278,7 +279,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return; return;
} }
scheduler.cancel(periodicCheckpointTask); this.scheduler.cancel(periodicCheckpointTask);
this.scheduler.stop();
// Take one final checkpoint and stop checkpoint processing. // Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true); checkpoint(true, true);
@ -723,6 +725,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
longTermPersistence.setBrokerName(brokerName); longTermPersistence.setBrokerName(brokerName);
} }
@Override
public String toString() { public String toString() {
return "JournalPersistenceAdapator(" + longTermPersistence + ")"; return "JournalPersistenceAdapator(" + longTermPersistence + ")";
} }

View File

@ -415,6 +415,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
@Override @Override
public String toString() { public String toString() {
return "KahaDBPersistenceAdapter"; String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path +"]" ;
} }
} }

View File

@ -72,6 +72,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
@ -94,6 +95,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private boolean concurrentStoreAndDispatchQueues = true; private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true; private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS; private int maxAsyncJobs = MAX_ASYNC_JOBS;
private Scheduler scheduler;
public KahaDBStore() { public KahaDBStore() {
@ -155,6 +157,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override @Override
public void doStart() throws Exception { public void doStart() throws Exception {
super.doStart();
this.queueSemaphore = new Semaphore(getMaxAsyncJobs()); this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
this.topicSemaphore = new Semaphore(getMaxAsyncJobs()); this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
@ -175,8 +178,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return thread; return thread;
} }
}); });
super.doStart();
} }
@Override @Override
@ -204,6 +205,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
protected void addQueueTask(StoreQueueTask task) throws IOException { protected void addQueueTask(StoreQueueTask task) throws IOException {
try { try {
this.queueSemaphore.acquire(); this.queueSemaphore.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage()); throw new InterruptedIOException(e.getMessage());
} }
@ -327,7 +329,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
} }

View File

@ -82,7 +82,7 @@ import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
private BrokerService brokerService; protected BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@ -245,7 +245,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// to see if we need to exit this thread. // to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval, 500); long sleepTime = Math.min(checkpointInterval, 500);
while (opened.get()) { while (opened.get()) {
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) { if( now - lastCleanup >= cleanupInterval ) {
@ -276,9 +275,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public void open() throws IOException { public void open() throws IOException {
if( opened.compareAndSet(false, true) ) { if( opened.compareAndSet(false, true) ) {
getJournal().start(); getJournal().start();
loadPageFile(); loadPageFile();
startCheckpoint(); startCheckpoint();
recover(); recover();
} }
@ -332,6 +329,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public void close() throws IOException, InterruptedException { public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) { if( opened.compareAndSet(true, false)) {
synchronized (indexMutex) { synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, true);
}
});
pageFile.unload(); pageFile.unload();
metadata = new Metadata(); metadata = new Metadata();
} }
@ -385,11 +387,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
*/ */
private void recover() throws IllegalStateException, IOException { private void recover() throws IllegalStateException, IOException {
synchronized (indexMutex) { synchronized (indexMutex) {
long start = System.currentTimeMillis();
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition(); Location recoveryPosition = getRecoveryPosition();
if( recoveryPosition!=null ) { if( recoveryPosition!=null ) {
int redoCounter = 0; int redoCounter = 0;
LOG.info("Recoverying from the journal ...");
while (recoveryPosition != null) { while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition); JournalCommand message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition; metadata.lastUpdate = recoveryPosition;
@ -398,7 +401,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
recoveryPosition = journal.getNextLocation(recoveryPosition); recoveryPosition = journal.getNextLocation(recoveryPosition);
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
} }
// We may have to undo some index updates. // We may have to undo some index updates.
@ -693,7 +696,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// from the recovery method too so they need to be idempotent // from the recovery method too so they need to be idempotent
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////
private void process(JournalCommand data, final Location location) throws IOException { void process(JournalCommand data, final Location location) throws IOException {
data.visit(new Visitor() { data.visit(new Visitor() {
@Override @Override
public void visit(KahaAddMessageCommand command) throws IOException { public void visit(KahaAddMessageCommand command) throws IOException {
@ -732,11 +735,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}); });
} }
private void process(final KahaAddMessageCommand command, final Location location) throws IOException { protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) { if (command.hasTransactionInfo()) {
synchronized (indexMutex) { synchronized (indexMutex) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location)); inflightTx.add(new AddOpperation(command, location));
TransactionId key = key(command.getTransactionInfo());
} }
} else { } else {
synchronized (indexMutex) { synchronized (indexMutex) {
@ -836,7 +840,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected final Object indexMutex = new Object(); protected final Object indexMutex = new Object();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// Skip adding the message to the index if this is a topic and there are // Skip adding the message to the index if this is a topic and there are
@ -870,7 +874,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
if (!command.hasSubscriptionKey()) { if (!command.hasSubscriptionKey()) {
@ -902,7 +906,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
} }
private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx); sd.orderIndex.clear(tx);
sd.orderIndex.unload(tx); sd.orderIndex.unload(tx);
@ -931,7 +935,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
metadata.destinations.remove(tx, key); metadata.destinations.remove(tx, key);
} }
private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// If set then we are creating it.. otherwise we are destroying the sub // If set then we are creating it.. otherwise we are destroying the sub
@ -961,8 +965,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* @param tx * @param tx
* @throws IOException * @throws IOException
*/ */
private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
LOG.debug("Checkpoint started."); LOG.debug("Checkpoint started.");
metadata.state = OPEN_STATE; metadata.state = OPEN_STATE;

View File

@ -19,32 +19,25 @@ package org.apache.activemq.thread;
import java.util.HashMap; import java.util.HashMap;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
/** /**
* Singelton, references maintained by users
* @version $Revision$ * @version $Revision$
*/ */
public final class Scheduler { public final class Scheduler extends ServiceSupport {
private final String name;
private Timer timer;
private final HashMap<Runnable, TimerTask> timerTasks = new HashMap<Runnable, TimerTask>();
private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true); public Scheduler (String name) {
private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>(); this.name = name;
private static Scheduler instance;
static {
instance = new Scheduler();
} }
private Scheduler() { public void executePeriodically(final Runnable task, long period) {
}
public static Scheduler getInstance() {
return instance;
}
public synchronized void executePeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task); TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period); timer.scheduleAtFixedRate(timerTask, period, period);
TIMER_TASKS.put(task, timerTask); timerTasks.put(task, timerTask);
} }
/* /*
@ -53,24 +46,38 @@ public final class Scheduler {
*/ */
public synchronized void schedualPeriodically(final Runnable task, long period) { public synchronized void schedualPeriodically(final Runnable task, long period) {
TimerTask timerTask = new SchedulerTimerTask(task); TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.schedule(timerTask, period, period); timer.schedule(timerTask, period, period);
TIMER_TASKS.put(task, timerTask); timerTasks.put(task, timerTask);
} }
public synchronized void cancel(Runnable task) { public synchronized void cancel(Runnable task) {
TimerTask ticket = TIMER_TASKS.remove(task); TimerTask ticket = timerTasks.remove(task);
if (ticket != null) { if (ticket != null) {
ticket.cancel(); ticket.cancel();
CLOCK_DAEMON.purge();//remove cancelled TimerTasks timer.purge();//remove cancelled TimerTasks
} }
} }
public void executeAfterDelay(final Runnable task, long redeliveryDelay) { public synchronized void executeAfterDelay(final Runnable task, long redeliveryDelay) {
TimerTask timerTask = new SchedulerTimerTask(task); TimerTask timerTask = new SchedulerTimerTask(task);
CLOCK_DAEMON.schedule(timerTask, redeliveryDelay); timer.schedule(timerTask, redeliveryDelay);
} }
public void shutdown() { public void shutdown() {
CLOCK_DAEMON.cancel(); timer.cancel();
}
@Override
protected synchronized void doStart() throws Exception {
this.timer = new Timer(name, true);
}
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
if (this.timer != null) {
this.timer.cancel();
}
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.usage;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.store.kahadb.plist.PListStore;
@ -36,6 +37,7 @@ public class SystemUsage implements Service {
private MemoryUsage memoryUsage; private MemoryUsage memoryUsage;
private StoreUsage storeUsage; private StoreUsage storeUsage;
private TempUsage tempUsage; private TempUsage tempUsage;
private ThreadPoolExecutor executor;
/** /**
* True if someone called setSendFailIfNoSpace() on this particular usage * True if someone called setSendFailIfNoSpace() on this particular usage
@ -58,14 +60,21 @@ public class SystemUsage implements Service {
this.memoryUsage = new MemoryUsage(name + ":memory"); this.memoryUsage = new MemoryUsage(name + ":memory");
this.storeUsage = new StoreUsage(name + ":store", adapter); this.storeUsage = new StoreUsage(name + ":store", adapter);
this.tempUsage = new TempUsage(name + ":temp", tempStore); this.tempUsage = new TempUsage(name + ":temp", tempStore);
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
} }
public SystemUsage(SystemUsage parent, String name) { public SystemUsage(SystemUsage parent, String name) {
this.parent = parent; this.parent = parent;
this.executor = parent.getExecutor();
this.name = name; this.name = name;
this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory"); this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store"); this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp"); this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
this.memoryUsage.setExecutor(getExecutor());
this.storeUsage.setExecutor(getExecutor());
this.tempUsage.setExecutor(getExecutor());
} }
public String getName() { public String getName() {
@ -186,6 +195,7 @@ public class SystemUsage implements Service {
memoryUsage.setParent(parent.memoryUsage); memoryUsage.setParent(parent.memoryUsage);
} }
this.memoryUsage = memoryUsage; this.memoryUsage = memoryUsage;
this.memoryUsage.setExecutor(getExecutor());
} }
public void setStoreUsage(StoreUsage storeUsage) { public void setStoreUsage(StoreUsage storeUsage) {
@ -199,6 +209,7 @@ public class SystemUsage implements Service {
storeUsage.setParent(parent.storeUsage); storeUsage.setParent(parent.storeUsage);
} }
this.storeUsage = storeUsage; this.storeUsage = storeUsage;
this.storeUsage.setExecutor(executor);
} }
@ -213,5 +224,30 @@ public class SystemUsage implements Service {
tempDiskUsage.setParent(parent.tempUsage); tempDiskUsage.setParent(parent.tempUsage);
} }
this.tempUsage = tempDiskUsage; this.tempUsage = tempDiskUsage;
this.tempUsage.setExecutor(getExecutor());
}
/**
* @return the executor
*/
public ThreadPoolExecutor getExecutor() {
return this.executor;
}
/**
* @param executor
* the executor to set
*/
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
if (this.memoryUsage != null) {
this.memoryUsage.setExecutor(this.executor);
}
if (this.storeUsage != null) {
this.storeUsage.setExecutor(this.executor);
}
if (this.tempUsage != null) {
this.tempUsage.setExecutor(this.executor);
}
} }
} }

View File

@ -21,13 +21,8 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,7 +38,6 @@ import org.apache.commons.logging.LogFactory;
public abstract class Usage<T extends Usage> implements Service { public abstract class Usage<T extends Usage> implements Service {
private static final Log LOG = LogFactory.getLog(Usage.class); private static final Log LOG = LogFactory.getLog(Usage.class);
private static ThreadPoolExecutor executor;
protected final Object usageMutex = new Object(); protected final Object usageMutex = new Object();
protected int percentUsage; protected int percentUsage;
protected T parent; protected T parent;
@ -53,12 +47,11 @@ public abstract class Usage<T extends Usage> implements Service {
private final boolean debug = LOG.isDebugEnabled(); private final boolean debug = LOG.isDebugEnabled();
private String name; private String name;
private float usagePortion = 1.0f; private float usagePortion = 1.0f;
private List<T> children = new CopyOnWriteArrayList<T>(); private final List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>(); private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100; private int pollingTime = 100;
private final AtomicBoolean started=new AtomicBoolean();
private AtomicBoolean started=new AtomicBoolean(); private ThreadPoolExecutor executor;
public Usage(T parent, String name, float portion) { public Usage(T parent, String name, float portion) {
this.parent = parent; this.parent = parent;
this.usagePortion = portion; this.usagePortion = portion;
@ -289,6 +282,7 @@ public abstract class Usage<T extends Usage> implements Service {
return name; return name;
} }
@Override
public String toString() { public String toString() {
return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%"; return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
} }
@ -411,18 +405,10 @@ public abstract class Usage<T extends Usage> implements Service {
this.parent = parent; this.parent = parent;
} }
protected Executor getExecutor() { public void setExecutor (ThreadPoolExecutor executor) {
this.executor = executor;
}
public ThreadPoolExecutor getExecutor() {
return executor; return executor;
} }
static {
executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Usage Async Task");
thread.setDaemon(true);
return thread;
}
});
}
} }

View File

@ -16,14 +16,23 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.IOHelper;
public class BrokerRestartTestSupport extends BrokerTestSupport { public class BrokerRestartTestSupport extends BrokerTestSupport {
private PersistenceAdapter persistenceAdapter; private PersistenceAdapter persistenceAdapter;
@Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
File dir = broker.getBrokerDataDirectory();
if (dir != null) {
IOHelper.deleteChildren(dir);
}
//broker.setPersistent(false); //broker.setPersistent(false);
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
persistenceAdapter = broker.getPersistenceAdapter(); persistenceAdapter = broker.getPersistenceAdapter();

View File

@ -17,13 +17,14 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -41,9 +42,7 @@ import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
@ -121,7 +120,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
} }
private class MessagePublisher implements Runnable{ private class MessagePublisher implements Runnable{
private boolean shouldPublish = true; private final boolean shouldPublish = true;
public void run(){ public void run(){
TopicConnectionFactory topicConnectionFactory = null; TopicConnectionFactory topicConnectionFactory = null;
@ -170,13 +169,14 @@ public class DurableConsumerTest extends CombinationTestSupport{
Thread publisherThread = new Thread(new MessagePublisher()); Thread publisherThread = new Thread(new MessagePublisher());
publisherThread.start(); publisherThread.start();
final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>();
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
final int id = i; final int id = i;
Thread thread = new Thread(new Runnable(){ Thread thread = new Thread(new Runnable(){
public void run(){ public void run(){
new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME); SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
list.add(s);
} }
}); });
thread.start(); thread.start();
@ -189,6 +189,9 @@ public class DurableConsumerTest extends CombinationTestSupport{
configurePersistence(broker); configurePersistence(broker);
broker.start(); broker.start();
Thread.sleep(10000); Thread.sleep(10000);
for (SimpleTopicSubscriber s:list) {
s.closeConnection();
}
assertEquals(0, exceptions.size()); assertEquals(0, exceptions.size());
} }
@ -358,6 +361,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
} }
@Override
protected void setUp() throws Exception{ protected void setUp() throws Exception{
if (broker == null) { if (broker == null) {
broker = createBroker(true); broker = createBroker(true);
@ -366,6 +370,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
super.setUp(); super.setUp();
} }
@Override
protected void tearDown() throws Exception{ protected void tearDown() throws Exception{
super.tearDown(); super.tearDown();
if (broker != null) { if (broker != null) {
@ -392,11 +397,13 @@ public class DurableConsumerTest extends CombinationTestSupport{
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{ protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
answer.setDeleteAllMessagesOnStartup(deleteStore); answer.setDeleteAllMessagesOnStartup(deleteStore);
KahaDBStore kaha = new KahaDBStore(); KahaDBStore kaha = new KahaDBStore();
//kaha.setConcurrentStoreAndDispatchTopics(false);
File directory = new File("target/activemq-data/kahadb"); File directory = new File("target/activemq-data/kahadb");
if (deleteStore) { if (deleteStore) {
IOHelper.deleteChildren(directory); IOHelper.deleteChildren(directory);
} }
kaha.setDirectory(directory); kaha.setDirectory(directory);
//kaha.setMaxAsyncJobs(10);
answer.setPersistenceAdapter(kaha); answer.setPersistenceAdapter(kaha);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);

View File

@ -26,7 +26,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.thread.Scheduler;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
public class EmbeddedActiveMQ public class EmbeddedActiveMQ
@ -39,6 +38,7 @@ public class EmbeddedActiveMQ
BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
BrokerService brokerService = null; BrokerService brokerService = null;
Connection connection = null;
logger.info("Start..."); logger.info("Start...");
try try
@ -49,7 +49,7 @@ public class EmbeddedActiveMQ
logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........"); logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........");
brokerService.start(); brokerService.start();
ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ"); ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ");
Connection connection = fac.createConnection(); connection = fac.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("TEST.QUEUE"); Destination queue = session.createQueue("TEST.QUEUE");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -71,12 +71,9 @@ public class EmbeddedActiveMQ
try try
{ {
br.close(); br.close();
Scheduler scheduler = Scheduler.getInstance();
scheduler.shutdown();
logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........"); logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........");
connection.close();
brokerService.stop(); brokerService.stop();
Scheduler.getInstance().shutdown();
sleep(8); sleep(8);
logger.info(ThreadExplorer.show("Active threads after stop:")); logger.info(ThreadExplorer.show("Active threads after stop:"));
@ -90,7 +87,7 @@ public class EmbeddedActiveMQ
logger.info("Waiting for list theads is greater then 1 ..."); logger.info("Waiting for list theads is greater then 1 ...");
int numTh = ThreadExplorer.active(); int numTh = ThreadExplorer.active();
while (numTh > 1) while (numTh > 2)
{ {
sleep(3); sleep(3);
numTh = ThreadExplorer.active(); numTh = ThreadExplorer.active();

View File

@ -18,15 +18,16 @@ package org.apache.activemq.network;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
public class DuplexNetworkTest extends SimpleNetworkTest { public class DuplexNetworkTest extends SimpleNetworkTest {
@Override
protected String getLocalBrokerURI() { protected String getLocalBrokerURI() {
return "org/apache/activemq/network/duplexLocalBroker.xml"; return "org/apache/activemq/network/duplexLocalBroker.xml";
} }
@Override
protected BrokerService createRemoteBroker() throws Exception { protected BrokerService createRemoteBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker"); broker.setBrokerName("remoteBroker");

View File

@ -17,7 +17,6 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import java.net.URI; import java.net.URI;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
@ -30,9 +29,7 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.TopicRequestor; import javax.jms.TopicRequestor;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
@ -174,11 +171,13 @@ public class SimpleNetworkTest extends TestCase {
} }
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
doSetUp(); doSetUp();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
localBroker.deleteAllMessages(); localBroker.deleteAllMessages();
remoteBroker.deleteAllMessages(); remoteBroker.deleteAllMessages();

View File

@ -20,10 +20,11 @@ package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -31,6 +32,7 @@ import org.junit.Test;
public class MemoryUsageTest { public class MemoryUsageTest {
MemoryUsage underTest; MemoryUsage underTest;
ThreadPoolExecutor executor;
@Test @Test
public final void testPercentUsageNeedsNoThread() { public final void testPercentUsageNeedsNoThread() {
@ -46,6 +48,7 @@ public class MemoryUsageTest {
public final void testAddUsageListenerStartsThread() throws Exception { public final void testAddUsageListenerStartsThread() throws Exception {
int activeThreadCount = Thread.activeCount(); int activeThreadCount = Thread.activeCount();
underTest = new MemoryUsage(); underTest = new MemoryUsage();
underTest.setExecutor(executor);
underTest.setLimit(10); underTest.setLimit(10);
underTest.start(); underTest.start();
final CountDownLatch called = new CountDownLatch(1); final CountDownLatch called = new CountDownLatch(1);
@ -67,11 +70,23 @@ public class MemoryUsageTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
underTest = new MemoryUsage(); underTest = new MemoryUsage();
this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Usage Async Task");
thread.setDaemon(true);
return thread;
}
});
underTest.setExecutor(this.executor);
} }
@After @After
public void tearDown() { public void tearDown() {
assertNotNull(underTest); assertNotNull(underTest);
underTest.stop(); underTest.stop();
if (this.executor != null) {
this.executor.shutdownNow();
}
} }
} }