LUCENE-3193: add TwoPhaseCommit interface

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1135204 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2011-06-13 18:07:27 +00:00
parent f3225e0153
commit b6a0959af6
5 changed files with 419 additions and 1 deletions

View File

@ -503,6 +503,10 @@ New Features
* LUCENE-3140: Added experimental FST implementation to Lucene.
(Robert Muir, Dawid Weiss, Mike McCandless)
* LUCENE-3193: A new TwoPhaseCommitTool allows running a 2-phase commit
algorithm over objects that implement the new TwoPhaseCommit interface (such
as IndexWriter). (Shai Erera)
Build
* LUCENE-1344: Create OSGi bundle using dev-tools/maven.

View File

@ -55,6 +55,7 @@ import org.apache.lucene.util.Constants;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.MapBackedSet;
import org.apache.lucene.util.TwoPhaseCommit;
/**
An <code>IndexWriter</code> creates and maintains an index.
@ -190,7 +191,7 @@ import org.apache.lucene.util.MapBackedSet;
* referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable {
public class IndexWriter implements Closeable, TwoPhaseCommit {
/**
* Name of the write lock in the index.
*/

View File

@ -0,0 +1,77 @@
package org.apache.lucene.util;
import java.io.IOException;
import java.util.Map;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* An interface for implementations that support 2-phase commit. You can use
* {@link TwoPhaseCommitTool} to execute a 2-phase commit algorithm over several
* {@link TwoPhaseCommit}s.
*
* @lucene.experimental
*/
public interface TwoPhaseCommit {
/**
* The first stage of a 2-phase commit. Implementations should do as much work
* as possible in this method, but avoid actual committing changes. If the
* 2-phase commit fails, {@link #rollback()} is called to discard all changes
* since last successful commit.
*/
public void prepareCommit() throws IOException;
/**
* Like {@link #commit()}, but takes an additional commit data to be included
* w/ the commit.
* <p>
* <b>NOTE:</b> some implementations may not support any custom data to be
* included w/ the commit and may discard it altogether. Consult the actual
* implementation documentation for verifying if this is supported.
*
* @see #prepareCommit()
*/
public void prepareCommit(Map<String, String> commitData) throws IOException;
/**
* The second phase of a 2-phase commit. Implementations should ideally do
* very little work in this method (following {@link #prepareCommit()}, and
* after it returns, the caller can assume that the changes were successfully
* committed to the underlying storage.
*/
public void commit() throws IOException;
/**
* Like {@link #commit()}, but takes an additional commit data to be included
* w/ the commit.
*
* @see #commit()
* @see #prepareCommit(Map)
*/
public void commit(Map<String, String> commitData) throws IOException;
/**
* Discards any changes that have occurred since the last commit. In a 2-phase
* commit algorithm, where one of the objects failed to {@link #commit()} or
* {@link #prepareCommit()}, this method is used to roll all other objects
* back to their previous state.
*/
public void rollback() throws IOException;
}

View File

@ -0,0 +1,167 @@
package org.apache.lucene.util;
import java.io.IOException;
import java.util.Map;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A utility for executing 2-phase commit on several objects.
*
* @see TwoPhaseCommit
* @lucene.experimental
*/
public final class TwoPhaseCommitTool {
/**
* A wrapper of a {@link TwoPhaseCommit}, which delegates all calls to the
* wrapped object, passing the specified commitData. This object is useful for
* use with {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} if one would
* like to store commitData as part of the commit.
*/
public static final class TwoPhaseCommitWrapper implements TwoPhaseCommit {
private final TwoPhaseCommit tpc;
private final Map<String, String> commitData;
public TwoPhaseCommitWrapper(TwoPhaseCommit tpc, Map<String, String> commitData) {
this.tpc = tpc;
this.commitData = commitData;
}
@Override
public void prepareCommit() throws IOException {
prepareCommit(commitData);
}
@Override
public void prepareCommit(Map<String, String> commitData) throws IOException {
tpc.prepareCommit(this.commitData);
}
@Override
public void commit() throws IOException {
commit(commitData);
}
@Override
public void commit(Map<String, String> commitData) throws IOException {
tpc.commit(this.commitData);
}
@Override
public void rollback() throws IOException {
tpc.rollback();
}
}
/**
* Thrown by {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} when an
* object fails to prepareCommit().
*/
public static class PrepareCommitFailException extends IOException {
public PrepareCommitFailException(Throwable cause, TwoPhaseCommit obj) {
super("prepareCommit() failed on " + obj);
initCause(cause);
}
}
/**
* Thrown by {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} when an
* object fails to commit().
*/
public static class CommitFailException extends IOException {
public CommitFailException(Throwable cause, TwoPhaseCommit obj) {
super("commit() failed on " + obj);
initCause(cause);
}
}
/** rollback all objects, discarding any exceptions that occur. */
private static void rollback(TwoPhaseCommit... objects) {
for (TwoPhaseCommit tpc : objects) {
// ignore any exception that occurs during rollback - we want to ensure
// all objects are rolled-back.
if (tpc != null) {
try {
tpc.rollback();
} catch (Throwable t) {}
}
}
}
/**
* Executes a 2-phase commit algorithm by first
* {@link TwoPhaseCommit#prepareCommit()} all objects and only if all succeed,
* it proceeds with {@link TwoPhaseCommit#commit()}. If any of the objects
* fail on either the preparation or actual commit, it terminates and
* {@link TwoPhaseCommit#rollback()} all of them.
* <p>
* <b>NOTE:</b> it may happen that an object fails to commit, after few have
* already successfully committed. This tool will still issue a rollback
* instruction on them as well, but depending on the implementation, it may
* not have any effect.
* <p>
* <b>NOTE:</b> if any of the objects are {@code null}, this method simply
* skips over them.
*
* @throws PrepareCommitFailException
* if any of the objects fail to
* {@link TwoPhaseCommit#prepareCommit()}
* @throws CommitFailException
* if any of the objects fail to {@link TwoPhaseCommit#commit()}
*/
public static void execute(TwoPhaseCommit... objects)
throws PrepareCommitFailException, CommitFailException {
TwoPhaseCommit tpc = null;
try {
// first, all should successfully prepareCommit()
for (int i = 0; i < objects.length; i++) {
tpc = objects[i];
if (tpc != null) {
tpc.prepareCommit();
}
}
} catch (Throwable t) {
// first object that fails results in rollback all of them and
// throwing an exception.
rollback(objects);
throw new PrepareCommitFailException(t, tpc);
}
// If all successfully prepareCommit(), attempt the actual commit()
try {
for (int i = 0; i < objects.length; i++) {
tpc = objects[i];
if (tpc != null) {
tpc.commit();
}
}
} catch (Throwable t) {
// first object that fails results in rollback all of them and
// throwing an exception.
rollback(objects);
throw new CommitFailException(t, tpc);
}
}
}

View File

@ -0,0 +1,169 @@
package org.apache.lucene.util;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.util.TwoPhaseCommitTool.TwoPhaseCommitWrapper;
public class TestTwoPhaseCommitTool extends LuceneTestCase {
private static class TwoPhaseCommitImpl implements TwoPhaseCommit {
static boolean commitCalled = false;
final boolean failOnPrepare;
final boolean failOnCommit;
final boolean failOnRollback;
boolean rollbackCalled = false;
Map<String, String> prepareCommitData = null;
Map<String, String> commitData = null;
public TwoPhaseCommitImpl(boolean failOnPrepare, boolean failOnCommit, boolean failOnRollback) {
this.failOnPrepare = failOnPrepare;
this.failOnCommit = failOnCommit;
this.failOnRollback = failOnRollback;
}
@Override
public void prepareCommit() throws IOException {
prepareCommit(null);
}
@Override
public void prepareCommit(Map<String, String> commitData) throws IOException {
this.prepareCommitData = commitData;
assertFalse("commit should not have been called before all prepareCommit were", commitCalled);
if (failOnPrepare) {
throw new IOException("failOnPrepare");
}
}
@Override
public void commit() throws IOException {
commit(null);
}
@Override
public void commit(Map<String, String> commitData) throws IOException {
this.commitData = commitData;
commitCalled = true;
if (failOnCommit) {
throw new RuntimeException("failOnCommit");
}
}
@Override
public void rollback() throws IOException {
rollbackCalled = true;
if (failOnRollback) {
throw new Error("failOnRollback");
}
}
}
@Override
public void setUp() throws Exception {
super.setUp();
TwoPhaseCommitImpl.commitCalled = false; // reset count before every test
}
public void testPrepareThenCommit() throws Exception {
// tests that prepareCommit() is called on all objects before commit()
TwoPhaseCommitImpl[] objects = new TwoPhaseCommitImpl[2];
for (int i = 0; i < objects.length; i++) {
objects[i] = new TwoPhaseCommitImpl(false, false, false);
}
// following call will fail if commit() is called before all prepare() were
TwoPhaseCommitTool.execute(objects);
}
public void testRollback() throws Exception {
// tests that rollback is called if failure occurs at any stage
int numObjects = random.nextInt(8) + 3; // between [3, 10]
TwoPhaseCommitImpl[] objects = new TwoPhaseCommitImpl[numObjects];
for (int i = 0; i < objects.length; i++) {
boolean failOnPrepare = random.nextBoolean();
// we should not hit failures on commit usually
boolean failOnCommit = random.nextDouble() < 0.05;
boolean railOnRollback = random.nextBoolean();
objects[i] = new TwoPhaseCommitImpl(failOnPrepare, failOnCommit, railOnRollback);
}
boolean anyFailure = false;
try {
TwoPhaseCommitTool.execute(objects);
} catch (Throwable t) {
anyFailure = true;
}
if (anyFailure) {
// if any failure happened, ensure that rollback was called on all.
for (TwoPhaseCommitImpl tpc : objects) {
assertTrue("rollback was not called while a failure occurred during the 2-phase commit", tpc.rollbackCalled);
}
}
}
public void testWrapper() throws Exception {
// tests that TwoPhaseCommitWrapper delegates prepare/commit w/ commitData
TwoPhaseCommitImpl impl = new TwoPhaseCommitImpl(false, false, false);
HashMap<String, String> commitData = new HashMap<String, String>();
TwoPhaseCommitWrapper wrapper = new TwoPhaseCommitWrapper(impl, commitData);
wrapper.prepareCommit();
assertSame(commitData, impl.prepareCommitData);
// wrapper should ignore passed commitData
wrapper.prepareCommit(new HashMap<String, String>());
assertSame(commitData, impl.prepareCommitData);
wrapper.commit();
assertSame(commitData, impl.commitData);
// wrapper should ignore passed commitData
wrapper.commit(new HashMap<String, String>());
assertSame(commitData, impl.commitData);
}
public void testNullTPCs() throws Exception {
int numObjects = random.nextInt(4) + 3; // between [3, 6]
TwoPhaseCommit[] tpcs = new TwoPhaseCommit[numObjects];
boolean setNull = false;
for (int i = 0; i < tpcs.length; i++) {
boolean isNull = random.nextDouble() < 0.3;
if (isNull) {
setNull = true;
tpcs[i] = null;
} else {
tpcs[i] = new TwoPhaseCommitImpl(false, false, false);
}
}
if (!setNull) {
// none of the TPCs were picked to be null, pick one at random
int idx = random.nextInt(numObjects);
tpcs[idx] = null;
}
// following call would fail if TPCTool won't handle null TPCs properly
TwoPhaseCommitTool.execute(tpcs);
}
}