improve async task concurrency, contention over indexMutex for transactions and taskMap, not per destination. Added some metrics enabled by system properties to help further diagnosis of journal usage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@960060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-02 17:11:00 +00:00
parent 554a01a420
commit 76f842d371
5 changed files with 106 additions and 45 deletions

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -84,10 +85,16 @@ import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@ -183,7 +190,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
@ -191,7 +198,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
return thread;
}
});
this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
@ -208,21 +215,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncQueueMap) {
for (StoreQueueTask task : this.asyncQueueMap.values()) {
task.cancel();
synchronized (this.asyncQueueMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncQueueMap.clear();
this.asyncQueueMaps.clear();
}
LOG.info("Stopping async topic tasks");
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
synchronized (this.asyncTopicMap) {
for (StoreTopicTask task : this.asyncTopicMap.values()) {
task.cancel();
synchronized (this.asyncTopicMaps) {
for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
synchronized (m) {
for (StoreTask task : m.values()) {
task.cancel();
}
}
}
this.asyncTopicMap.clear();
this.asyncTopicMaps.clear();
}
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.drainPermits();
@ -242,30 +257,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
StoreQueueTask task = null;
synchronized (this.asyncQueueMap) {
task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
synchronized (store.asyncTaskMap) {
task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
synchronized (this.asyncQueueMap) {
this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
synchronized (store.asyncTaskMap) {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.queueExecutor.execute(task);
}
protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
StoreTopicTask task = null;
synchronized (this.asyncTopicMap) {
task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
synchronized (store.asyncTaskMap) {
task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
synchronized (this.asyncTopicMap) {
this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
synchronized (store.asyncTaskMap) {
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.topicExecutor.execute(task);
}
@ -275,9 +290,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
@ -309,8 +327,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
if (isConcurrentStoreAndDispatchQueues()) {
AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
StoreQueueTask task = null;
synchronized (asyncQueueMap) {
task = asyncQueueMap.get(key);
synchronized (asyncTaskMap) {
task = (StoreQueueTask) asyncTaskMap.get(key);
}
if (task != null) {
if (!task.cancel()) {
@ -324,8 +342,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
}
removeMessage(context, ack);
} else {
synchronized (asyncQueueMap) {
asyncQueueMap.remove(key);
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
} else {
@ -545,6 +563,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
asyncTopicMaps.add(asyncTaskMap);
}
@Override
@ -566,15 +585,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
synchronized (asyncTopicMap) {
task = asyncTopicMap.get(key);
synchronized (asyncTaskMap) {
task = (StoreTopicTask) asyncTaskMap.get(key);
}
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(this, messageId);
if (task.cancel()) {
synchronized (asyncTopicMap) {
asyncTopicMap.remove(key);
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}
}
}
@ -994,8 +1013,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
return destination.getPhysicalName() + "-" + id;
}
}
class StoreQueueTask implements Runnable {
interface StoreTask {
public boolean cancel();
}
class StoreQueueTask implements Runnable, StoreTask {
protected final Message message;
protected final ConnectionContext context;
protected final KahaDBMessageStore store;
@ -1044,11 +1067,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
}
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);
@ -1128,6 +1155,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
@Override
public void run() {
this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.topicStore.addMessage(context, message);
@ -1140,6 +1168,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
}
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
} else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);

View File

@ -367,6 +367,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
*/
private Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
l = inflightTransactions.values().iterator().next().get(0).getLocation();
}
@ -376,6 +377,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
l = t;
}
}
}
return l;
}
@ -746,7 +748,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
synchronized (indexMutex) {
synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
}
@ -763,7 +765,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
synchronized (indexMutex) {
synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new RemoveOpperation(command, location));
}
@ -801,16 +803,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected void process(KahaCommitCommand command, Location location) throws IOException {
TransactionId key = key(command.getTransactionInfo());
synchronized (indexMutex) {
ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
ArrayList<Operation> inflightTx = null;
synchronized (inflightTransactions) {
inflightTx = inflightTransactions.remove(key);
if (inflightTx == null) {
inflightTx = preparedTransactions.remove(key);
}
if (inflightTx == null) {
return;
}
}
if (inflightTx == null) {
return;
}
final ArrayList<Operation> messagingTx = inflightTx;
final ArrayList<Operation> messagingTx = inflightTx;
synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
@ -822,7 +827,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
protected void process(KahaPrepareCommand command, Location location) {
synchronized (indexMutex) {
synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
if (tx != null) {
@ -832,7 +837,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
protected void process(KahaRollbackCommand command, Location location) {
synchronized (indexMutex) {
synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
if (tx == null) {

View File

@ -86,8 +86,7 @@ public class AMQ2149Test extends TestCase {
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setDataDirectory(dataDirFile);
configurePersistenceAdapter(broker);
SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage();
@ -95,7 +94,7 @@ public class AMQ2149Test extends TestCase {
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
broker.setPersistenceFactory(persistenceFactory);
broker.addConnector(BROKER_CONNECTOR);
broker.setBrokerName(getName());
@ -106,6 +105,12 @@ public class AMQ2149Test extends TestCase {
broker.start();
}
protected void configurePersistenceAdapter(BrokerService brokerService) {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setDataDirectory(dataDirFile);
brokerService.setPersistenceFactory(persistenceFactory);
}
public void setUp() throws Exception {
dataDirFile = new File("target/"+ getName());
numtoSend = DEFAULT_NUM_TO_SEND;

View File

@ -47,5 +47,6 @@ public class KahaDBDurableTopicTest extends SimpleDurableTopicTest {
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
answer.setEnableStatistics(false);
}
}

View File

@ -257,7 +257,11 @@ class DataFileAppender {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
@ -295,6 +299,10 @@ class DataFileAppender {
}
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
int statIdx = 0;
int[] stats = new int[maxStat];
/**
* The async processing loop that writes to the data files and does the
* force calls. Since the file sync() call is the slowest of all the
@ -376,6 +384,17 @@ class DataFileAppender {
// Now do the 1 big write.
file.seek(wb.offset);
if (maxStat > 0) {
if (statIdx < maxStat) {
stats[statIdx++] = sequence.getLength();
} else {
long all = 0;
for (;statIdx > 0;) {
all+= stats[--statIdx];
}
System.err.println("Ave writeSize: " + all/maxStat);
}
}
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();