allow journal write batching, resolve global transaction lock and journal lock, let writes accumulate on the datafileappender rather than at the store, also tie transaction completion and after commit processing together with a callback rather than with a global lock so that concurrent commits can batch their writes - rework of fix for https://issues.apache.org/activemq/browse/AMQ-2594

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@947657 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-24 14:48:55 +00:00
parent da0490b501
commit e1389a6acc
14 changed files with 101 additions and 57 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.FutureTask;
import org.apache.activemq.Service;
import org.apache.activemq.command.TransactionId;
@ -31,7 +32,7 @@ public interface TransactionStore extends Service {
void prepare(TransactionId txid) throws IOException;
void commit(TransactionId txid, boolean wasPrepared) throws IOException;
void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
void rollback(TransactionId txid) throws IOException;

View File

@ -99,7 +99,7 @@ public class AMQTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
AMQTx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@ -111,6 +111,7 @@ public class AMQTransactionStore implements TransactionStore {
}
}
if (tx == null) {
done.run();
return;
}
if (txid.isXATransaction()) {
@ -118,6 +119,7 @@ public class AMQTransactionStore implements TransactionStore {
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
}
done.run();
}
/**

View File

@ -176,7 +176,7 @@ public class JournalTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
Tx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@ -188,6 +188,7 @@ public class JournalTransactionStore implements TransactionStore {
}
}
if (tx == null) {
done.run();
return;
}
if (txid.isXATransaction()) {
@ -197,6 +198,7 @@ public class JournalTransactionStore implements TransactionStore {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
wasPrepared), true);
}
done.run();
}
/**

View File

@ -101,12 +101,13 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.commit(this);
removeTx(txid);
}
done.run();
}
/**

View File

@ -241,14 +241,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore() {
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
}
public void prepare(TransactionId txid) throws IOException {
store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
}
public void rollback(TransactionId txid) throws IOException {
store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
}
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
@ -333,7 +333,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
}
@ -345,13 +345,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
store(command, true);
store(command, true, null);
}
public Message getMessage(MessageId identity) throws IOException {
@ -519,7 +519,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask task = asyncTopicMap.get(messageId);
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(messageId);
task.cancel();
@ -538,7 +537,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
store(command, false);
store(command, false, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@ -550,7 +549,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && true);
store(command, isEnableJournalDiskSyncs() && true, null);
this.subscriptionCount.incrementAndGet();
}
@ -558,7 +557,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(command, isEnableJournalDiskSyncs() && true);
store(command, isEnableJournalDiskSyncs() && true, null);
this.subscriptionCount.decrementAndGet();
}

View File

@ -630,7 +630,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
return store(data, false);
return store(data, false, null);
}
/**
@ -638,8 +638,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
* during a recovery process.
* @param done
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@ -662,6 +663,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint();
}
if (done != null) {
done.run();
}
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);

View File

@ -72,8 +72,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
processCommit(txid);
done.run();
}
public void prepare(TransactionId txid) throws IOException {
processPrepare(txid);

View File

@ -194,7 +194,7 @@ public class MemoryTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
Tx tx;
if (wasPrepared) {
@ -204,9 +204,11 @@ public class MemoryTransactionStore implements TransactionStore {
}
if (tx == null) {
done.run();
return;
}
tx.commit();
done.run();
}

View File

@ -69,21 +69,8 @@ public class LocalTransaction extends Transaction {
context.getTransactions().remove(xid);
// Sync on transaction store to avoid out of order messages in the cursor
// https://issues.apache.org/activemq/browse/AMQ-2594
synchronized (transactionStore) {
transactionStore.commit(getTransactionId(), false);
try {
fireAfterCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
LOG.warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e);
throw xae;
}
}
transactionStore.commit(getTransactionId(), false, postCommitTask);
this.waitPostCommitDone(postCommitTask);
}
public void rollback() throws XAException, IOException {
@ -120,5 +107,9 @@ public class LocalTransaction extends Transaction {
public TransactionId getTransactionId() {
return xid;
}
@Override
public Log getLog() {
return LOG;
}
}

View File

@ -17,13 +17,18 @@
package org.apache.activemq.transaction;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.TransactionId;
import org.apache.commons.logging.Log;
/**
* Keeps track of all the actions the need to be done when a transaction does a
@ -31,7 +36,7 @@ import org.apache.activemq.command.TransactionId;
*
* @version $Revision: 1.5 $
*/
public abstract class Transaction {
public abstract class Transaction implements Callable {
public static final byte START_STATE = 0; // can go to: 1,2,3
public static final byte IN_USE_STATE = 1; // can go to: 2,3
@ -40,7 +45,8 @@ public abstract class Transaction {
private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
protected FutureTask<?> postCommitTask = new FutureTask(this);
public byte getState() {
return state;
}
@ -108,6 +114,8 @@ public abstract class Transaction {
public abstract TransactionId getTransactionId();
public abstract Log getLog();
public boolean isPrepared() {
return getState() == PREPARED_STATE;
}
@ -115,4 +123,41 @@ public abstract class Transaction {
public int size() {
return synchronizations.size();
}
protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
try {
postCommitTask.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof XAException) {
throw (XAException) t;
} else if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new XAException(e.toString());
}
}
}
protected void doPostCommit() throws XAException {
try {
fireAfterCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e);
throw xae;
}
}
public Object call() throws Exception {
doPostCommit();
return null;
}
}

View File

@ -64,15 +64,15 @@ public class XATransaction extends Transaction {
checkForPreparedState(onePhase);
doPrePrepare();
setStateFinished();
transactionStore.commit(getTransactionId(), false);
doPostCommit();
transactionStore.commit(getTransactionId(), false, postCommitTask);
waitPostCommitDone(postCommitTask);
break;
case PREPARED_STATE:
// 2 phase commit, work done.
// We would record commit here.
setStateFinished();
transactionStore.commit(getTransactionId(), true);
doPostCommit();
transactionStore.commit(getTransactionId(), true, postCommitTask);
waitPostCommitDone(postCommitTask);
break;
default:
illegalStateTransition("commit");
@ -108,20 +108,6 @@ public class XATransaction extends Transaction {
}
}
private void doPostCommit() throws XAException {
try {
fireAfterCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
LOG.warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e);
throw xae;
}
}
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@ -195,4 +181,9 @@ public class XATransaction extends Transaction {
public TransactionId getTransactionId() {
return xid;
}
@Override
public Log getLog() {
return LOG;
}
}

View File

@ -62,6 +62,7 @@ public class ThreadExplorer
* @param isStarredExp
* (regular expressions with *)
*/
@SuppressWarnings("deprecation")
public static int kill(String threadName, boolean isStarredExp, String motivation)
{
String me = "ThreadExplorer.kill: ";

View File

@ -35,12 +35,15 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.1 $
*/
public class ConnectorXBeanConfigTest extends TestCase {
private static final Log LOG = LogFactory.getLog(ConnectorXBeanConfigTest.class);
protected BrokerService brokerService;
public void testConnectorConfiguredCorrectly() throws Exception {
@ -76,6 +79,7 @@ public class ConnectorXBeanConfigTest extends TestCase {
brokerService.start(true); // force restart
brokerService.waitUntilStarted();
LOG.info("try and connect to restarted broker");
//send and receive a message from a restarted broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
Connection conn = factory.createConnection();

View File

@ -596,12 +596,12 @@ public class Journal {
return rc;
}
public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
return loc;
}
public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
return loc;
}