synchronization around creation of preparedTransactions

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@960645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-07-05 17:18:10 +00:00
parent 95ba6efceb
commit 179bd54e22
1 changed files with 35 additions and 30 deletions

View File

@ -82,14 +82,16 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
@ -151,7 +153,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
/**
* @return the concurrentStoreAndDispatchTransactions
*/
@ -160,7 +162,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
}
/**
* @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set
* @param concurrentStoreAndDispatchTransactions
* the concurrentStoreAndDispatchTransactions to set
*/
public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
@ -179,9 +182,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs;
}
@Override
public void doStart() throws Exception {
@ -190,16 +190,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
new ThreadFactory() {
this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
thread.setDaemon(true);
return thread;
}
});
this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
new ThreadFactory() {
this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
thread.setDaemon(true);
@ -210,7 +210,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
@Override
public void doStop(ServiceStopper stopper) throws Exception {
//drain down async jobs
// drain down async jobs
LOG.info("Stopping async queue tasks");
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@ -294,7 +294,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) {
@ -697,14 +697,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
if (selector != null) {
selectorExpression = SelectorParser.parse(selector);
}
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (selectorExpression != null) {
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setMessageReference(loadMessage(entry.getValue().location));
if (selectorExpression.matches(ctx)) {
counter++;
}
if (selectorExpression != null) {
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setMessageReference(loadMessage(entry.getValue().location));
if (selectorExpression.matches(ctx)) {
counter++;
}
} else {
counter++;
}
@ -913,9 +914,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
KahaTransactionInfo rc = new KahaTransactionInfo();
// Link it up to the previous record that was part of the transaction.
ArrayList<Operation> tx = inflightTransactions.get(txid);
if (tx != null) {
rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
synchronized (inflightTransactions) {
ArrayList<Operation> tx = inflightTransactions.get(txid);
if (tx != null) {
rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
}
}
if (txid.isLocalTransaction()) {
@ -1013,11 +1016,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
return destination.getPhysicalName() + "-" + id;
}
}
interface StoreTask {
public boolean cancel();
}
class StoreQueueTask implements Runnable, StoreTask {
protected final Message message;
protected final ConnectionContext context;
@ -1074,7 +1077,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
@ -1163,13 +1167,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
}
}
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
System.err.println(this.store.dest.getName() + " cancelled: "
+ (this.store.canceledTasks / this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {