From b6a0959af698935f68d35a9dca0a3253742feec5 Mon Sep 17 00:00:00 2001 From: Shai Erera Date: Mon, 13 Jun 2011 18:07:27 +0000 Subject: [PATCH] LUCENE-3193: add TwoPhaseCommit interface git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1135204 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 4 + .../org/apache/lucene/index/IndexWriter.java | 3 +- .../apache/lucene/util/TwoPhaseCommit.java | 77 ++++++++ .../lucene/util/TwoPhaseCommitTool.java | 167 +++++++++++++++++ .../lucene/util/TestTwoPhaseCommitTool.java | 169 ++++++++++++++++++ 5 files changed, 419 insertions(+), 1 deletion(-) create mode 100644 lucene/src/java/org/apache/lucene/util/TwoPhaseCommit.java create mode 100644 lucene/src/java/org/apache/lucene/util/TwoPhaseCommitTool.java create mode 100644 lucene/src/test/org/apache/lucene/util/TestTwoPhaseCommitTool.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 7409f5cc138..a138cd9a6e5 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -503,6 +503,10 @@ New Features * LUCENE-3140: Added experimental FST implementation to Lucene. (Robert Muir, Dawid Weiss, Mike McCandless) +* LUCENE-3193: A new TwoPhaseCommitTool allows running a 2-phase commit + algorithm over objects that implement the new TwoPhaseCommit interface (such + as IndexWriter). (Shai Erera) + Build * LUCENE-1344: Create OSGi bundle using dev-tools/maven. diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index bc2df32aaa9..460b6aaec95 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -55,6 +55,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.MapBackedSet; +import org.apache.lucene.util.TwoPhaseCommit; /** An IndexWriter creates and maintains an index. @@ -190,7 +191,7 @@ import org.apache.lucene.util.MapBackedSet; * referenced by the "front" of the index). For this, IndexFileDeleter * keeps track of the last non commit checkpoint. */ -public class IndexWriter implements Closeable { +public class IndexWriter implements Closeable, TwoPhaseCommit { /** * Name of the write lock in the index. */ diff --git a/lucene/src/java/org/apache/lucene/util/TwoPhaseCommit.java b/lucene/src/java/org/apache/lucene/util/TwoPhaseCommit.java new file mode 100644 index 00000000000..efd2d699d3a --- /dev/null +++ b/lucene/src/java/org/apache/lucene/util/TwoPhaseCommit.java @@ -0,0 +1,77 @@ +package org.apache.lucene.util; + +import java.io.IOException; +import java.util.Map; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An interface for implementations that support 2-phase commit. You can use + * {@link TwoPhaseCommitTool} to execute a 2-phase commit algorithm over several + * {@link TwoPhaseCommit}s. + * + * @lucene.experimental + */ +public interface TwoPhaseCommit { + + /** + * The first stage of a 2-phase commit. Implementations should do as much work + * as possible in this method, but avoid actual committing changes. If the + * 2-phase commit fails, {@link #rollback()} is called to discard all changes + * since last successful commit. + */ + public void prepareCommit() throws IOException; + + /** + * Like {@link #commit()}, but takes an additional commit data to be included + * w/ the commit. + *

+ * NOTE: some implementations may not support any custom data to be + * included w/ the commit and may discard it altogether. Consult the actual + * implementation documentation for verifying if this is supported. + * + * @see #prepareCommit() + */ + public void prepareCommit(Map commitData) throws IOException; + + /** + * The second phase of a 2-phase commit. Implementations should ideally do + * very little work in this method (following {@link #prepareCommit()}, and + * after it returns, the caller can assume that the changes were successfully + * committed to the underlying storage. + */ + public void commit() throws IOException; + + /** + * Like {@link #commit()}, but takes an additional commit data to be included + * w/ the commit. + * + * @see #commit() + * @see #prepareCommit(Map) + */ + public void commit(Map commitData) throws IOException; + + /** + * Discards any changes that have occurred since the last commit. In a 2-phase + * commit algorithm, where one of the objects failed to {@link #commit()} or + * {@link #prepareCommit()}, this method is used to roll all other objects + * back to their previous state. + */ + public void rollback() throws IOException; + +} diff --git a/lucene/src/java/org/apache/lucene/util/TwoPhaseCommitTool.java b/lucene/src/java/org/apache/lucene/util/TwoPhaseCommitTool.java new file mode 100644 index 00000000000..b16d746cc97 --- /dev/null +++ b/lucene/src/java/org/apache/lucene/util/TwoPhaseCommitTool.java @@ -0,0 +1,167 @@ +package org.apache.lucene.util; + +import java.io.IOException; +import java.util.Map; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A utility for executing 2-phase commit on several objects. + * + * @see TwoPhaseCommit + * @lucene.experimental + */ +public final class TwoPhaseCommitTool { + + /** + * A wrapper of a {@link TwoPhaseCommit}, which delegates all calls to the + * wrapped object, passing the specified commitData. This object is useful for + * use with {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} if one would + * like to store commitData as part of the commit. + */ + public static final class TwoPhaseCommitWrapper implements TwoPhaseCommit { + + private final TwoPhaseCommit tpc; + private final Map commitData; + + public TwoPhaseCommitWrapper(TwoPhaseCommit tpc, Map commitData) { + this.tpc = tpc; + this.commitData = commitData; + } + + @Override + public void prepareCommit() throws IOException { + prepareCommit(commitData); + } + + @Override + public void prepareCommit(Map commitData) throws IOException { + tpc.prepareCommit(this.commitData); + } + + @Override + public void commit() throws IOException { + commit(commitData); + } + + @Override + public void commit(Map commitData) throws IOException { + tpc.commit(this.commitData); + } + + @Override + public void rollback() throws IOException { + tpc.rollback(); + } + } + + /** + * Thrown by {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} when an + * object fails to prepareCommit(). + */ + public static class PrepareCommitFailException extends IOException { + + public PrepareCommitFailException(Throwable cause, TwoPhaseCommit obj) { + super("prepareCommit() failed on " + obj); + initCause(cause); + } + + } + + /** + * Thrown by {@link TwoPhaseCommitTool#execute(TwoPhaseCommit...)} when an + * object fails to commit(). + */ + public static class CommitFailException extends IOException { + + public CommitFailException(Throwable cause, TwoPhaseCommit obj) { + super("commit() failed on " + obj); + initCause(cause); + } + + } + + /** rollback all objects, discarding any exceptions that occur. */ + private static void rollback(TwoPhaseCommit... objects) { + for (TwoPhaseCommit tpc : objects) { + // ignore any exception that occurs during rollback - we want to ensure + // all objects are rolled-back. + if (tpc != null) { + try { + tpc.rollback(); + } catch (Throwable t) {} + } + } + } + + /** + * Executes a 2-phase commit algorithm by first + * {@link TwoPhaseCommit#prepareCommit()} all objects and only if all succeed, + * it proceeds with {@link TwoPhaseCommit#commit()}. If any of the objects + * fail on either the preparation or actual commit, it terminates and + * {@link TwoPhaseCommit#rollback()} all of them. + *

+ * NOTE: it may happen that an object fails to commit, after few have + * already successfully committed. This tool will still issue a rollback + * instruction on them as well, but depending on the implementation, it may + * not have any effect. + *

+ * NOTE: if any of the objects are {@code null}, this method simply + * skips over them. + * + * @throws PrepareCommitFailException + * if any of the objects fail to + * {@link TwoPhaseCommit#prepareCommit()} + * @throws CommitFailException + * if any of the objects fail to {@link TwoPhaseCommit#commit()} + */ + public static void execute(TwoPhaseCommit... objects) + throws PrepareCommitFailException, CommitFailException { + TwoPhaseCommit tpc = null; + try { + // first, all should successfully prepareCommit() + for (int i = 0; i < objects.length; i++) { + tpc = objects[i]; + if (tpc != null) { + tpc.prepareCommit(); + } + } + } catch (Throwable t) { + // first object that fails results in rollback all of them and + // throwing an exception. + rollback(objects); + throw new PrepareCommitFailException(t, tpc); + } + + // If all successfully prepareCommit(), attempt the actual commit() + try { + for (int i = 0; i < objects.length; i++) { + tpc = objects[i]; + if (tpc != null) { + tpc.commit(); + } + } + } catch (Throwable t) { + // first object that fails results in rollback all of them and + // throwing an exception. + rollback(objects); + throw new CommitFailException(t, tpc); + } + } + +} diff --git a/lucene/src/test/org/apache/lucene/util/TestTwoPhaseCommitTool.java b/lucene/src/test/org/apache/lucene/util/TestTwoPhaseCommitTool.java new file mode 100644 index 00000000000..b7e528857f7 --- /dev/null +++ b/lucene/src/test/org/apache/lucene/util/TestTwoPhaseCommitTool.java @@ -0,0 +1,169 @@ +package org.apache.lucene.util; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.util.TwoPhaseCommitTool.TwoPhaseCommitWrapper; + +public class TestTwoPhaseCommitTool extends LuceneTestCase { + + private static class TwoPhaseCommitImpl implements TwoPhaseCommit { + static boolean commitCalled = false; + final boolean failOnPrepare; + final boolean failOnCommit; + final boolean failOnRollback; + boolean rollbackCalled = false; + Map prepareCommitData = null; + Map commitData = null; + + public TwoPhaseCommitImpl(boolean failOnPrepare, boolean failOnCommit, boolean failOnRollback) { + this.failOnPrepare = failOnPrepare; + this.failOnCommit = failOnCommit; + this.failOnRollback = failOnRollback; + } + + @Override + public void prepareCommit() throws IOException { + prepareCommit(null); + } + + @Override + public void prepareCommit(Map commitData) throws IOException { + this.prepareCommitData = commitData; + assertFalse("commit should not have been called before all prepareCommit were", commitCalled); + if (failOnPrepare) { + throw new IOException("failOnPrepare"); + } + } + + @Override + public void commit() throws IOException { + commit(null); + } + + @Override + public void commit(Map commitData) throws IOException { + this.commitData = commitData; + commitCalled = true; + if (failOnCommit) { + throw new RuntimeException("failOnCommit"); + } + } + + @Override + public void rollback() throws IOException { + rollbackCalled = true; + if (failOnRollback) { + throw new Error("failOnRollback"); + } + } + } + + @Override + public void setUp() throws Exception { + super.setUp(); + TwoPhaseCommitImpl.commitCalled = false; // reset count before every test + } + + public void testPrepareThenCommit() throws Exception { + // tests that prepareCommit() is called on all objects before commit() + TwoPhaseCommitImpl[] objects = new TwoPhaseCommitImpl[2]; + for (int i = 0; i < objects.length; i++) { + objects[i] = new TwoPhaseCommitImpl(false, false, false); + } + + // following call will fail if commit() is called before all prepare() were + TwoPhaseCommitTool.execute(objects); + } + + public void testRollback() throws Exception { + // tests that rollback is called if failure occurs at any stage + int numObjects = random.nextInt(8) + 3; // between [3, 10] + TwoPhaseCommitImpl[] objects = new TwoPhaseCommitImpl[numObjects]; + for (int i = 0; i < objects.length; i++) { + boolean failOnPrepare = random.nextBoolean(); + // we should not hit failures on commit usually + boolean failOnCommit = random.nextDouble() < 0.05; + boolean railOnRollback = random.nextBoolean(); + objects[i] = new TwoPhaseCommitImpl(failOnPrepare, failOnCommit, railOnRollback); + } + + boolean anyFailure = false; + try { + TwoPhaseCommitTool.execute(objects); + } catch (Throwable t) { + anyFailure = true; + } + + if (anyFailure) { + // if any failure happened, ensure that rollback was called on all. + for (TwoPhaseCommitImpl tpc : objects) { + assertTrue("rollback was not called while a failure occurred during the 2-phase commit", tpc.rollbackCalled); + } + } + } + + public void testWrapper() throws Exception { + // tests that TwoPhaseCommitWrapper delegates prepare/commit w/ commitData + TwoPhaseCommitImpl impl = new TwoPhaseCommitImpl(false, false, false); + HashMap commitData = new HashMap(); + TwoPhaseCommitWrapper wrapper = new TwoPhaseCommitWrapper(impl, commitData); + + wrapper.prepareCommit(); + assertSame(commitData, impl.prepareCommitData); + + // wrapper should ignore passed commitData + wrapper.prepareCommit(new HashMap()); + assertSame(commitData, impl.prepareCommitData); + + wrapper.commit(); + assertSame(commitData, impl.commitData); + + // wrapper should ignore passed commitData + wrapper.commit(new HashMap()); + assertSame(commitData, impl.commitData); + } + + public void testNullTPCs() throws Exception { + int numObjects = random.nextInt(4) + 3; // between [3, 6] + TwoPhaseCommit[] tpcs = new TwoPhaseCommit[numObjects]; + boolean setNull = false; + for (int i = 0; i < tpcs.length; i++) { + boolean isNull = random.nextDouble() < 0.3; + if (isNull) { + setNull = true; + tpcs[i] = null; + } else { + tpcs[i] = new TwoPhaseCommitImpl(false, false, false); + } + } + + if (!setNull) { + // none of the TPCs were picked to be null, pick one at random + int idx = random.nextInt(numObjects); + tpcs[idx] = null; + } + + // following call would fail if TPCTool won't handle null TPCs properly + TwoPhaseCommitTool.execute(tpcs); + } + +}