diff --git a/dev-tools/maven/lucene/replicator/pom.xml.template b/dev-tools/maven/lucene/replicator/pom.xml.template
new file mode 100644
index 00000000000..e5382354ba9
--- /dev/null
+++ b/dev-tools/maven/lucene/replicator/pom.xml.template
@@ -0,0 +1,75 @@
+
+
+ 4.0.0
+
+ org.apache.lucene
+ lucene-parent
+ @version@
+ ../pom.xml
+
+ org.apache.lucene
+ lucene-replicator
+ jar
+ Lucene Replicator
+ Lucene Replicator Module
+
+ lucene/replicator
+ ../../..
+ ${relative-top-level}/${module-directory}
+
+
+ scm:svn:${vc-anonymous-base-url}/${module-directory}
+ scm:svn:${vc-dev-base-url}/${module-directory}
+ ${vc-browse-base-url}/${module-directory}
+
+
+
+
+ ${project.groupId}
+ lucene-test-framework
+ ${project.version}
+ test
+
+
+ ${project.groupId}
+ lucene-core
+ ${project.version}
+
+
+ ${project.groupId}
+ lucene-facet
+ ${project.version}
+
+
+
+ ${module-path}/src/java
+ ${module-path}/src/test
+
+
+ ${project.build.testSourceDirectory}
+
+ **/*.java
+
+
+
+
+
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 21599190171..a40ba38e355 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -181,6 +181,9 @@ New Features
* LUCENE-4979: LiveFieldFields can work with any ReferenceManager, not
just ReferenceManager (Mike McCandless).
+* LUCENE-4975: Added a new Replicator module which can replicate index
+ revisions between server and client. (Shai Erera, Mike McCandless)
+
Build
* LUCENE-4987: Upgrade randomized testing to version 2.0.10:
diff --git a/lucene/build.xml b/lucene/build.xml
index e0af8ffba77..0c16b7f5ac4 100644
--- a/lucene/build.xml
+++ b/lucene/build.xml
@@ -160,7 +160,13 @@
-
+
+
+
+
+
+
+
diff --git a/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java b/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
index 2e84744bf2e..788b157c8da 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
@@ -327,7 +327,7 @@ public abstract class DirectoryReader extends BaseCompositeReader
// corrupt first commit, but it's too deadly to make
// this logic "smarter" and risk accidentally returning
// false due to various cases like file description
- // exhaustion, access denited, etc., because in that
+ // exhaustion, access denied, etc., because in that
// case IndexWriter may delete the entire index. It's
// safer to err towards "index exists" than try to be
// smart about detecting not-yet-fully-committed or
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java b/lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java
index b74b18c5948..058f718a525 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexFileNames.java
@@ -199,7 +199,10 @@ public final class IndexFileNames {
return filename;
}
- // All files created by codecs much match this pattern (we
- // check this in SegmentInfo.java):
- static final Pattern CODEC_FILE_PATTERN = Pattern.compile("_[a-z0-9]+(_.*)?\\..*");
+ /**
+ * All files created by codecs much match this pattern (checked in
+ * SegmentInfo).
+ */
+ public static final Pattern CODEC_FILE_PATTERN = Pattern.compile("_[a-z0-9]+(_.*)?\\..*");
+
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
index 203e7aeba71..7851e792b97 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -242,6 +242,39 @@ public final class SegmentInfos implements Cloneable, Iterable
+ * NOTE: this is an internal utility which is kept public so that it's
+ * accessible by code from other packages. You should avoid calling this
+ * method unless you're absolutely sure what you're doing!
+ *
+ * @lucene.internal
+ */
+ public static void writeSegmentsGen(Directory dir, long generation) {
+ try {
+ IndexOutput genOutput = dir.createOutput(IndexFileNames.SEGMENTS_GEN, IOContext.READONCE);
+ try {
+ genOutput.writeInt(FORMAT_SEGMENTS_GEN_CURRENT);
+ genOutput.writeLong(generation);
+ genOutput.writeLong(generation);
+ } finally {
+ genOutput.close();
+ dir.sync(Collections.singleton(IndexFileNames.SEGMENTS_GEN));
+ }
+ } catch (Throwable t) {
+ // It's OK if we fail to write this file since it's
+ // used only as one of the retry fallbacks.
+ try {
+ dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
+ } catch (Throwable t2) {
+ // Ignore; this file is only used in a retry
+ // fallback on init.
+ }
+ }
+ }
/**
* Get the next segments_N filename that will be written.
@@ -848,27 +881,7 @@ public final class SegmentInfos implements Cloneable, Iterable
+
+
+
+
+Eclipse.org Software User Agreement
+
+
Eclipse Foundation Software User Agreement
+
March 17, 2005
+
+
Usage Of Content
+
+
THE ECLIPSE FOUNDATION MAKES AVAILABLE SOFTWARE, DOCUMENTATION, INFORMATION AND/OR OTHER MATERIALS FOR OPEN SOURCE PROJECTS
+ (COLLECTIVELY "CONTENT"). USE OF THE CONTENT IS GOVERNED BY THE TERMS AND CONDITIONS OF THIS AGREEMENT AND/OR THE TERMS AND
+ CONDITIONS OF LICENSE AGREEMENTS OR NOTICES INDICATED OR REFERENCED BELOW. BY USING THE CONTENT, YOU AGREE THAT YOUR USE
+ OF THE CONTENT IS GOVERNED BY THIS AGREEMENT AND/OR THE TERMS AND CONDITIONS OF ANY APPLICABLE LICENSE AGREEMENTS OR
+ NOTICES INDICATED OR REFERENCED BELOW. IF YOU DO NOT AGREE TO THE TERMS AND CONDITIONS OF THIS AGREEMENT AND THE TERMS AND
+ CONDITIONS OF ANY APPLICABLE LICENSE AGREEMENTS OR NOTICES INDICATED OR REFERENCED BELOW, THEN YOU MAY NOT USE THE CONTENT.
+
+
Applicable Licenses
+
+
Unless otherwise indicated, all Content made available by the
+Eclipse Foundation is provided to you under the terms and conditions of
+the Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is
+provided with this Content and is also available at http://www.eclipse.org/legal/epl-v10.html.
+ For purposes of the EPL, "Program" will mean the Content.
+
+
Content includes, but is not limited to, source code, object code,
+documentation and other files maintained in the Eclipse.org CVS
+repository ("Repository") in CVS modules ("Modules") and made available
+as downloadable archives ("Downloads").
+
+
+
Content may be structured and packaged into modules to
+facilitate delivering, extending, and upgrading the Content. Typical
+modules may include plug-ins ("Plug-ins"), plug-in fragments
+("Fragments"), and features ("Features").
+
Each Plug-in or Fragment may be packaged as a sub-directory or JAR (Java™ ARchive) in a directory named "plugins".
+
A
+Feature is a bundle of one or more Plug-ins and/or Fragments and
+associated material. Each Feature may be packaged as a sub-directory in
+a directory named "features". Within a Feature, files named
+"feature.xml" may contain a list of the names and version numbers of
+the Plug-ins and/or Fragments associated with that Feature.
+
Features
+may also include other Features ("Included Features"). Within a
+Feature, files named "feature.xml" may contain a list of the names and
+version numbers of Included Features.
+
+
+
The terms and conditions governing Plug-ins and Fragments should be
+contained in files named "about.html" ("Abouts"). The terms and
+conditions governing Features and
+Included Features should be contained in files named "license.html"
+("Feature Licenses"). Abouts and Feature Licenses may be located in any
+directory of a Download or Module
+including, but not limited to the following locations:
+
+
+
The top-level (root) directory
+
Plug-in and Fragment directories
+
Inside Plug-ins and Fragments packaged as JARs
+
Sub-directories of the directory named "src" of certain Plug-ins
+
Feature directories
+
+
+
Note: if a Feature made available by the Eclipse Foundation is
+installed using the Eclipse Update Manager, you must agree to a license
+("Feature Update License") during the
+installation process. If the Feature contains Included Features, the
+Feature Update License should either provide you with the terms and
+conditions governing the Included Features or
+inform you where you can locate them. Feature Update Licenses may be
+found in the "license" property of files named "feature.properties"
+found within a Feature.
+Such Abouts, Feature Licenses, and Feature Update Licenses contain the
+terms and conditions (or references to such terms and conditions) that
+govern your use of the associated Content in
+that directory.
+
+
THE ABOUTS, FEATURE LICENSES, AND FEATURE UPDATE LICENSES MAY REFER
+TO THE EPL OR OTHER LICENSE AGREEMENTS, NOTICES OR TERMS AND
+CONDITIONS. SOME OF THESE
+OTHER LICENSE AGREEMENTS MAY INCLUDE (BUT ARE NOT LIMITED TO):
IT IS YOUR OBLIGATION TO READ AND ACCEPT ALL SUCH TERMS AND
+CONDITIONS PRIOR TO USE OF THE CONTENT. If no About, Feature License,
+or Feature Update License is provided, please
+contact the Eclipse Foundation to determine what terms and conditions
+govern that particular Content.
+
+
Cryptography
+
+
Content may contain encryption software. The country in which you
+are currently may have restrictions on the import, possession, and use,
+and/or re-export to another country, of encryption software. BEFORE
+using any encryption software, please check the country's laws,
+regulations and policies concerning the import, possession, or use, and
+re-export of encryption software, to see if this is permitted.
+
+Java and all Java-based trademarks are trademarks of Sun Microsystems, Inc. in the United States, other countries, or both.
+
\ No newline at end of file
diff --git a/lucene/licenses/jetty-continuation-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-continuation-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..40d32c2e00f
--- /dev/null
+++ b/lucene/licenses/jetty-continuation-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+c0e26574ddcac7a86486f19a8b3782657acfd961
diff --git a/lucene/licenses/jetty-http-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-http-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..7cc91649c4e
--- /dev/null
+++ b/lucene/licenses/jetty-http-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+d9eb53007e04d6338f12f3ded60fad1f7bfcb40e
diff --git a/lucene/licenses/jetty-io-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-io-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..2ba8e66bf24
--- /dev/null
+++ b/lucene/licenses/jetty-io-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+e829c768f2b9de5d9fae3bc0aba3996bd0344f56
diff --git a/lucene/licenses/jetty-server-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-server-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..2674565b8f6
--- /dev/null
+++ b/lucene/licenses/jetty-server-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+13ca9587bc1645f8fac89454b15252a2ad5bdcf5
diff --git a/lucene/licenses/jetty-servlet-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-servlet-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..ac4faaac6ca
--- /dev/null
+++ b/lucene/licenses/jetty-servlet-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+98f8029fe7236e9c66381c04f292b5319f47ca84
diff --git a/lucene/licenses/jetty-util-8.1.10.v20130312.jar.sha1 b/lucene/licenses/jetty-util-8.1.10.v20130312.jar.sha1
new file mode 100644
index 00000000000..cff356ddd4e
--- /dev/null
+++ b/lucene/licenses/jetty-util-8.1.10.v20130312.jar.sha1
@@ -0,0 +1 @@
+d198a8ad8ea20b4fb74c781175c48500ec2b8b7a
diff --git a/lucene/licenses/servlet-api-3.0.jar.sha1 b/lucene/licenses/servlet-api-3.0.jar.sha1
new file mode 100644
index 00000000000..749a2c29b35
--- /dev/null
+++ b/lucene/licenses/servlet-api-3.0.jar.sha1
@@ -0,0 +1 @@
+0aaaa85845fb5c59da00193f06b8e5278d8bf3f8
diff --git a/lucene/licenses/slf4j-LICENSE-BSD_LIKE.txt b/lucene/licenses/slf4j-LICENSE-BSD_LIKE.txt
new file mode 100644
index 00000000000..f5ecafa0074
--- /dev/null
+++ b/lucene/licenses/slf4j-LICENSE-BSD_LIKE.txt
@@ -0,0 +1,21 @@
+Copyright (c) 2004-2008 QOS.ch
+All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/lucene/licenses/slf4j-NOTICE.txt b/lucene/licenses/slf4j-NOTICE.txt
new file mode 100644
index 00000000000..cf438946ad9
--- /dev/null
+++ b/lucene/licenses/slf4j-NOTICE.txt
@@ -0,0 +1,25 @@
+=========================================================================
+== SLF4J Notice -- http://www.slf4j.org/license.html ==
+=========================================================================
+
+Copyright (c) 2004-2008 QOS.ch
+All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/lucene/licenses/slf4j-api-1.6.6.jar.sha1 b/lucene/licenses/slf4j-api-1.6.6.jar.sha1
new file mode 100644
index 00000000000..e2e47d0d468
--- /dev/null
+++ b/lucene/licenses/slf4j-api-1.6.6.jar.sha1
@@ -0,0 +1 @@
+ce53b0a0e2cfbb27e8a59d38f79a18a5c6a8d2b0
diff --git a/lucene/module-build.xml b/lucene/module-build.xml
index a35c0b25d3a..572de31e1b0 100644
--- a/lucene/module-build.xml
+++ b/lucene/module-build.xml
@@ -220,7 +220,29 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/lucene/replicator/build.xml b/lucene/replicator/build.xml
new file mode 100644
index 00000000000..3786902a6ea
--- /dev/null
+++ b/lucene/replicator/build.xml
@@ -0,0 +1,49 @@
+
+
+
+
+
+ Files replication utility
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/lucene/replicator/ivy.xml b/lucene/replicator/ivy.xml
new file mode 100644
index 00000000000..fe3bd3459a1
--- /dev/null
+++ b/lucene/replicator/ivy.xml
@@ -0,0 +1,50 @@
+
+
+]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java
new file mode 100755
index 00000000000..83aba75aeb9
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyReplicationHandler.java
@@ -0,0 +1,191 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.InfoStream;
+
+/**
+ * A {@link ReplicationHandler} for replication of an index and taxonomy pair.
+ * See {@link IndexReplicationHandler} for more detail. This handler ensures
+ * that the search and taxonomy indexes are replicated in a consistent way.
+ *
+ * NOTE: if you intend to recreate a taxonomy index, you should make sure
+ * to reopen an IndexSearcher and TaxonomyReader pair via the provided callback,
+ * to guarantee that both indexes are in sync. This handler does not prevent
+ * replicating such index and taxonomy pairs, and if they are reopened by a
+ * different thread, unexpected errors can occur, as well as inconsistency
+ * between the taxonomy and index readers.
+ *
+ * @see IndexReplicationHandler
+ *
+ * @lucene.experimental
+ */
+public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
+
+ /**
+ * The component used to log messages to the {@link InfoStream#getDefault()
+ * default} {@link InfoStream}.
+ */
+ public static final String INFO_STREAM_COMPONENT = "IndexAndTaxonomyReplicationHandler";
+
+ private final Directory indexDir;
+ private final Directory taxoDir;
+ private final Callable callback;
+
+ private volatile Map> currentRevisionFiles;
+ private volatile String currentVersion;
+ private volatile InfoStream infoStream = InfoStream.getDefault();
+
+ /**
+ * Constructor with the given index directory and callback to notify when the
+ * indexes were updated.
+ */
+ public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable callback)
+ throws IOException {
+ this.callback = callback;
+ this.indexDir = indexDir;
+ this.taxoDir = taxoDir;
+ currentRevisionFiles = null;
+ currentVersion = null;
+ final boolean indexExists = DirectoryReader.indexExists(indexDir);
+ final boolean taxoExists = DirectoryReader.indexExists(taxoDir);
+ if (indexExists != taxoExists) {
+ throw new IllegalStateException("search and taxonomy indexes must either both exist or not: index=" + indexExists
+ + " taxo=" + taxoExists);
+ }
+ if (indexExists) { // both indexes exist
+ final IndexCommit indexCommit = IndexReplicationHandler.getLastCommit(indexDir);
+ final IndexCommit taxoCommit = IndexReplicationHandler.getLastCommit(taxoDir);
+ currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit);
+ currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit);
+ final InfoStream infoStream = InfoStream.getDefault();
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ + " currentRevisionFiles=" + currentRevisionFiles);
+ infoStream.message(INFO_STREAM_COMPONENT, "constructor(): indexCommit=" + indexCommit
+ + " taxoCommit=" + taxoCommit);
+ }
+ }
+ }
+
+ @Override
+ public String currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public Map> currentRevisionFiles() {
+ return currentRevisionFiles;
+ }
+
+ @Override
+ public void revisionReady(String version, Map> revisionFiles,
+ Map> copiedFiles, Map sourceDirectory) throws IOException {
+ Directory taxoClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
+ Directory indexClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
+ List taxoFiles = copiedFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
+ List indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
+ String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true);
+ String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false);
+
+ boolean success = false;
+ try {
+ // copy taxonomy files before index files
+ IndexReplicationHandler.copyFiles(taxoClientDir, taxoDir, taxoFiles);
+ IndexReplicationHandler.copyFiles(indexClientDir, indexDir, indexFiles);
+
+ // fsync all copied files (except segmentsFile)
+ if (!taxoFiles.isEmpty()) {
+ taxoDir.sync(taxoFiles);
+ }
+ indexDir.sync(indexFiles);
+
+ // now copy and fsync segmentsFile, taxonomy first because it is ok if a
+ // reader sees a more advanced taxonomy than the index.
+ if (taxoSegmentsFile != null) {
+ taxoClientDir.copy(taxoDir, taxoSegmentsFile, taxoSegmentsFile, IOContext.READONCE);
+ }
+ indexClientDir.copy(indexDir, indexSegmentsFile, indexSegmentsFile, IOContext.READONCE);
+
+ if (taxoSegmentsFile != null) {
+ taxoDir.sync(Collections.singletonList(taxoSegmentsFile));
+ }
+ indexDir.sync(Collections.singletonList(indexSegmentsFile));
+
+ success = true;
+ } finally {
+ if (!success) {
+ taxoFiles.add(taxoSegmentsFile); // add it back so it gets deleted too
+ IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles);
+ indexFiles.add(indexSegmentsFile); // add it back so it gets deleted too
+ IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles);
+ }
+ }
+
+ // all files have been successfully copied + sync'd. update the handler's state
+ currentRevisionFiles = revisionFiles;
+ currentVersion = version;
+
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
+ + " currentRevisionFiles=" + currentRevisionFiles);
+ }
+
+ // update the segments.gen file
+ IndexReplicationHandler.writeSegmentsGen(taxoSegmentsFile, taxoDir);
+ IndexReplicationHandler.writeSegmentsGen(indexSegmentsFile, indexDir);
+
+ // Cleanup the index directory from old and unused index files.
+ // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ // side-effects, e.g. if it hits sudden IO errors while opening the index
+ // (and can end up deleting the entire index). It is not our job to protect
+ // against those errors, app will probably hit them elsewhere.
+ IndexReplicationHandler.cleanupOldIndexFiles(indexDir, indexSegmentsFile);
+ IndexReplicationHandler.cleanupOldIndexFiles(taxoDir, taxoSegmentsFile);
+
+ // successfully updated the index, notify the callback that the index is
+ // ready.
+ if (callback != null) {
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /** Sets the {@link InfoStream} to use for logging messages. */
+ public void setInfoStream(InfoStream infoStream) {
+ if (infoStream == null) {
+ infoStream = InfoStream.NO_OUTPUT;
+ }
+ this.infoStream = infoStream;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java
new file mode 100755
index 00000000000..dbad317cf44
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexAndTaxonomyRevision.java
@@ -0,0 +1,219 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
+import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+/**
+ * A {@link Revision} of a single index and taxonomy index files which comprises
+ * the list of files from both indexes. This revision should be used whenever a
+ * pair of search and taxonomy indexes need to be replicated together to
+ * guarantee consistency of both on the replicating (client) side.
+ *
+ * @see IndexRevision
+ *
+ * @lucene.experimental
+ */
+public class IndexAndTaxonomyRevision implements Revision {
+
+ /**
+ * A {@link DirectoryTaxonomyWriter} which sets the underlying
+ * {@link IndexWriter}'s {@link IndexDeletionPolicy} to
+ * {@link SnapshotDeletionPolicy}.
+ */
+ public static final class SnapshotDirectoryTaxonomyWriter extends DirectoryTaxonomyWriter {
+
+ private SnapshotDeletionPolicy sdp;
+ private IndexWriter writer;
+
+ /**
+ * @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory,
+ * IndexWriterConfig.OpenMode, TaxonomyWriterCache)
+ */
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode, TaxonomyWriterCache cache)
+ throws IOException {
+ super(directory, openMode, cache);
+ }
+
+ /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory, IndexWriterConfig.OpenMode) */
+ public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode) throws IOException {
+ super(directory, openMode);
+ }
+
+ /** @see DirectoryTaxonomyWriter#DirectoryTaxonomyWriter(Directory) */
+ public SnapshotDirectoryTaxonomyWriter(Directory d) throws IOException {
+ super(d);
+ }
+
+ @Override
+ protected IndexWriterConfig createIndexWriterConfig(OpenMode openMode) {
+ IndexWriterConfig conf = super.createIndexWriterConfig(openMode);
+ conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
+ return conf;
+ }
+
+ @Override
+ protected IndexWriter openIndexWriter(Directory directory, IndexWriterConfig config) throws IOException {
+ writer = super.openIndexWriter(directory, config);
+ // must set it here because IndexWriter clones the config
+ sdp = (SnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
+ return writer;
+ }
+
+ /** Returns the {@link SnapshotDeletionPolicy} used by the underlying {@link IndexWriter}. */
+ public SnapshotDeletionPolicy getDeletionPolicy() {
+ return sdp;
+ }
+
+ /** Returns the {@link IndexWriter} used by this {@link DirectoryTaxonomyWriter}. */
+ public IndexWriter getIndexWriter() {
+ return writer;
+ }
+
+ }
+
+ private static final int RADIX = 16;
+
+ public static final String INDEX_SOURCE = "index";
+ public static final String TAXONOMY_SOURCE = "taxo";
+
+ private final IndexWriter indexWriter;
+ private final SnapshotDirectoryTaxonomyWriter taxoWriter;
+ private final IndexCommit indexCommit, taxoCommit;
+ private final SnapshotDeletionPolicy indexSDP, taxoSDP;
+ private final String version;
+ private final Map> sourceFiles;
+
+ /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ public static Map> revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit)
+ throws IOException {
+ HashMap> files = new HashMap>();
+ files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next());
+ files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next());
+ return files;
+ }
+
+ /**
+ * Returns a String representation of a revision's version from the given
+ * {@link IndexCommit}s of the search and taxonomy indexes.
+ */
+ public static String revisionVersion(IndexCommit indexCommit, IndexCommit taxoCommit) {
+ return Long.toString(indexCommit.getGeneration(), RADIX) + ":" + Long.toString(taxoCommit.getGeneration(), RADIX);
+ }
+
+ /**
+ * Constructor over the given {@link IndexWriter}. Uses the last
+ * {@link IndexCommit} found in the {@link Directory} managed by the given
+ * writer.
+ */
+ public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxoWriter)
+ throws IOException {
+ IndexDeletionPolicy delPolicy = indexWriter.getConfig().getIndexDeletionPolicy();
+ if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ }
+ this.indexWriter = indexWriter;
+ this.taxoWriter = taxoWriter;
+ this.indexSDP = (SnapshotDeletionPolicy) delPolicy;
+ this.taxoSDP = taxoWriter.getDeletionPolicy();
+ this.indexCommit = indexSDP.snapshot();
+ this.taxoCommit = taxoSDP.snapshot();
+ this.version = revisionVersion(indexCommit, taxoCommit);
+ this.sourceFiles = revisionFiles(indexCommit, taxoCommit);
+ }
+
+ @Override
+ public int compareTo(String version) {
+ final String[] parts = version.split(":");
+ final long indexGen = Long.parseLong(parts[0], RADIX);
+ final long taxoGen = Long.parseLong(parts[1], RADIX);
+ final long indexCommitGen = indexCommit.getGeneration();
+ final long taxoCommitGen = taxoCommit.getGeneration();
+
+ // if the index generation is not the same as this commit's generation,
+ // compare by it. Otherwise, compare by the taxonomy generation.
+ if (indexCommitGen < indexGen) {
+ return -1;
+ } else if (indexCommitGen > indexGen) {
+ return 1;
+ } else {
+ return taxoCommitGen < taxoGen ? -1 : (taxoCommitGen > taxoGen ? 1 : 0);
+ }
+ }
+
+ @Override
+ public int compareTo(Revision o) {
+ IndexAndTaxonomyRevision other = (IndexAndTaxonomyRevision) o;
+ int cmp = indexCommit.compareTo(other.indexCommit);
+ return cmp != 0 ? cmp : taxoCommit.compareTo(other.taxoCommit);
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public Map> getSourceFiles() {
+ return sourceFiles;
+ }
+
+ @Override
+ public InputStream open(String source, String fileName) throws IOException {
+ assert source.equals(INDEX_SOURCE) || source.equals(TAXONOMY_SOURCE) : "invalid source; expected=(" + INDEX_SOURCE
+ + " or " + TAXONOMY_SOURCE + ") got=" + source;
+ IndexCommit ic = source.equals(INDEX_SOURCE) ? indexCommit : taxoCommit;
+ return new IndexInputInputStream(ic.getDirectory().openInput(fileName, IOContext.READONCE));
+ }
+
+ @Override
+ public void release() throws IOException {
+ try {
+ indexSDP.release(indexCommit);
+ } finally {
+ taxoSDP.release(taxoCommit);
+ }
+
+ try {
+ indexWriter.deleteUnusedFiles();
+ } finally {
+ taxoWriter.getIndexWriter().deleteUnusedFiles();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "IndexAndTaxonomyRevision version=" + version + " files=" + sourceFiles;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java
new file mode 100755
index 00000000000..32911948b67
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexInputInputStream.java
@@ -0,0 +1,92 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * An {@link InputStream} which wraps an {@link IndexInput}.
+ *
+ * @lucene.experimental
+ */
+public final class IndexInputInputStream extends InputStream {
+
+ private final IndexInput in;
+
+ private long remaining;
+
+ public IndexInputInputStream(IndexInput in) {
+ this.in = in;
+ remaining = in.length();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remaining == 0) {
+ return -1;
+ } else {
+ --remaining;
+ return in.readByte();
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) in.length();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < len) {
+ len = (int) remaining;
+ }
+ in.readBytes(b, off, len);
+ remaining -= len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (remaining == 0) {
+ return -1;
+ }
+ if (remaining < n) {
+ n = remaining;
+ }
+ in.seek(in.getFilePointer() + n);
+ remaining -= n;
+ return n;
+ }
+
+}
\ No newline at end of file
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java
new file mode 100755
index 00000000000..325c96d0e60
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexReplicationHandler.java
@@ -0,0 +1,308 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.regex.Matcher;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.InfoStream;
+
+/**
+ * A {@link ReplicationHandler} for replication of an index. Implements
+ * {@link #revisionReady} by copying the files pointed by the client resolver to
+ * the index {@link Directory} and then touches the index with
+ * {@link IndexWriter} to make sure any unused files are deleted.
+ *
+ * NOTE: this handler assumes that {@link IndexWriter} is not opened by
+ * another process on the index directory. In fact, opening an
+ * {@link IndexWriter} on the same directory to which files are copied can lead
+ * to undefined behavior, where some or all the files will be deleted, override
+ * other files or simply create a mess. When you replicate an index, it is best
+ * if the index is never modified by {@link IndexWriter}, except the one that is
+ * open on the source index, from which you replicate.
+ *
+ * This handler notifies the application via a provided {@link Callable} when an
+ * updated index commit was made available for it.
+ *
+ * @lucene.experimental
+ */
+public class IndexReplicationHandler implements ReplicationHandler {
+
+ /**
+ * The component used to log messages to the {@link InfoStream#getDefault()
+ * default} {@link InfoStream}.
+ */
+ public static final String INFO_STREAM_COMPONENT = "IndexReplicationHandler";
+
+ private final Directory indexDir;
+ private final Callable callback;
+
+ private volatile Map> currentRevisionFiles;
+ private volatile String currentVersion;
+ private volatile InfoStream infoStream = InfoStream.getDefault();
+
+ /**
+ * Returns the last {@link IndexCommit} found in the {@link Directory}, or
+ * {@code null} if there are no commits.
+ */
+ public static IndexCommit getLastCommit(Directory dir) throws IOException {
+ try {
+ if (DirectoryReader.indexExists(dir)) {
+ List commits = DirectoryReader.listCommits(dir);
+ // listCommits guarantees that we get at least one commit back, or
+ // IndexNotFoundException which we handle below
+ return commits.get(commits.size() - 1);
+ }
+ } catch (IndexNotFoundException e) {
+ // ignore the exception and return null
+ }
+ return null;
+ }
+
+ /**
+ * Verifies that the last file is segments_N and fails otherwise. It also
+ * removes and returns the file from the list, because it needs to be handled
+ * last, after all files. This is important in order to guarantee that if a
+ * reader sees the new segments_N, all other segment files are already on
+ * stable storage.
+ *
+ * The reason why the code fails instead of putting segments_N file last is
+ * that this indicates an error in the Revision implementation.
+ */
+ public static String getSegmentsFile(List files, boolean allowEmpty) {
+ if (files.isEmpty()) {
+ if (allowEmpty) {
+ return null;
+ } else {
+ throw new IllegalStateException("empty list of files not allowed");
+ }
+ }
+
+ String segmentsFile = files.remove(files.size() - 1);
+ if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.SEGMENTS_GEN)) {
+ throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
+ + "; check your Revision implementation!");
+ }
+ return segmentsFile;
+ }
+
+ /**
+ * Cleanup the index directory by deleting all given files. Called when file
+ * copy or sync failed.
+ */
+ public static void cleanupFilesOnFailure(Directory dir, List files) {
+ for (String file : files) {
+ try {
+ if (dir.fileExists(file)) {
+ dir.deleteFile(file);
+ }
+ } catch (Throwable t) {
+ // suppress any exception because if we're here, it means copy
+ // failed, and we must cleanup after ourselves.
+ }
+ }
+ }
+
+ /**
+ * Cleans up the index directory from old index files. This method uses the
+ * last commit found by {@link #getLastCommit(Directory)}. If it matches the
+ * expected segmentsFile, then all files not referenced by this commit point
+ * are deleted.
+ *
+ * NOTE: this method does a best effort attempt to clean the index
+ * directory. It suppresses any exceptions that occur, as this can be retried
+ * the next time.
+ */
+ public static void cleanupOldIndexFiles(Directory dir, String segmentsFile) {
+ try {
+ IndexCommit commit = getLastCommit(dir);
+ // commit == null means weird IO errors occurred, ignore them
+ // if there were any IO errors reading the expected commit point (i.e.
+ // segments files mismatch), then ignore that commit either.
+ if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
+ Set commitFiles = new HashSet();
+ commitFiles.addAll(commit.getFileNames());
+ commitFiles.add(IndexFileNames.SEGMENTS_GEN);
+ Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
+ for (String file : dir.listAll()) {
+ if (!commitFiles.contains(file)
+ && (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
+ try {
+ dir.deleteFile(file);
+ } catch (Throwable t) {
+ // suppress, it's just a best effort
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // ignore any errors that happens during this state and only log it. this
+ // cleanup will have a chance to succeed the next time we get a new
+ // revision.
+ }
+ }
+
+ /**
+ * Copies the files from the source directory to the target one, if they are
+ * not the same.
+ */
+ public static void copyFiles(Directory source, Directory target, List files) throws IOException {
+ if (!source.equals(target)) {
+ for (String file : files) {
+ source.copy(target, file, file, IOContext.READONCE);
+ }
+ }
+ }
+
+ /**
+ * Writes {@link IndexFileNames#SEGMENTS_GEN} file to the directory, reading
+ * the generation from the given {@code segmentsFile}. If it is {@code null},
+ * this method deletes segments.gen from the directory.
+ */
+ public static void writeSegmentsGen(String segmentsFile, Directory dir) {
+ if (segmentsFile != null) {
+ SegmentInfos.writeSegmentsGen(dir, SegmentInfos.generationFromSegmentsFileName(segmentsFile));
+ } else {
+ try {
+ if (dir.fileExists(IndexFileNames.SEGMENTS_GEN)) {
+ dir.deleteFile(IndexFileNames.SEGMENTS_GEN);
+ }
+ } catch (Throwable t) {
+ // suppress any errors while deleting this file.
+ }
+ }
+ }
+
+ /**
+ * Constructor with the given index directory and callback to notify when the
+ * indexes were updated.
+ */
+ public IndexReplicationHandler(Directory indexDir, Callable callback) throws IOException {
+ this.callback = callback;
+ this.indexDir = indexDir;
+ currentRevisionFiles = null;
+ currentVersion = null;
+ if (DirectoryReader.indexExists(indexDir)) {
+ final List commits = DirectoryReader.listCommits(indexDir);
+ final IndexCommit commit = commits.get(commits.size() - 1);
+ currentRevisionFiles = IndexRevision.revisionFiles(commit);
+ currentVersion = IndexRevision.revisionVersion(commit);
+ final InfoStream infoStream = InfoStream.getDefault();
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ + " currentRevisionFiles=" + currentRevisionFiles);
+ infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
+ }
+ }
+ }
+
+ @Override
+ public String currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public Map> currentRevisionFiles() {
+ return currentRevisionFiles;
+ }
+
+ @Override
+ public void revisionReady(String version, Map> revisionFiles,
+ Map> copiedFiles, Map sourceDirectory) throws IOException {
+ if (revisionFiles.size() > 1) {
+ throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
+ }
+
+ Directory clientDir = sourceDirectory.values().iterator().next();
+ List files = copiedFiles.values().iterator().next();
+ String segmentsFile = getSegmentsFile(files, false);
+
+ boolean success = false;
+ try {
+ // copy files from the client to index directory
+ copyFiles(clientDir, indexDir, files);
+
+ // fsync all copied files (except segmentsFile)
+ indexDir.sync(files);
+
+ // now copy and fsync segmentsFile
+ clientDir.copy(indexDir, segmentsFile, segmentsFile, IOContext.READONCE);
+ indexDir.sync(Collections.singletonList(segmentsFile));
+
+ success = true;
+ } finally {
+ if (!success) {
+ files.add(segmentsFile); // add it back so it gets deleted too
+ cleanupFilesOnFailure(indexDir, files);
+ }
+ }
+
+ // all files have been successfully copied + sync'd. update the handler's state
+ currentRevisionFiles = revisionFiles;
+ currentVersion = version;
+
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
+ + " currentRevisionFiles=" + currentRevisionFiles);
+ }
+
+ // update the segments.gen file
+ writeSegmentsGen(segmentsFile, indexDir);
+
+ // Cleanup the index directory from old and unused index files.
+ // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
+ // side-effects, e.g. if it hits sudden IO errors while opening the index
+ // (and can end up deleting the entire index). It is not our job to protect
+ // against those errors, app will probably hit them elsewhere.
+ cleanupOldIndexFiles(indexDir, segmentsFile);
+
+ // successfully updated the index, notify the callback that the index is
+ // ready.
+ if (callback != null) {
+ try {
+ callback.call();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /** Sets the {@link InfoStream} to use for logging messages. */
+ public void setInfoStream(InfoStream infoStream) {
+ if (infoStream == null) {
+ infoStream = InfoStream.NO_OUTPUT;
+ }
+ this.infoStream = infoStream;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java
new file mode 100755
index 00000000000..d135a3ddb1e
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/IndexRevision.java
@@ -0,0 +1,150 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+/**
+ * A {@link Revision} of a single index files which comprises the list of files
+ * that are part of the current {@link IndexCommit}. To ensure the files are not
+ * deleted by {@link IndexWriter} for as long as this revision stays alive (i.e.
+ * until {@link #release()}), the current commit point is snapshotted, using
+ * {@link SnapshotDeletionPolicy} (this means that the given writer's
+ * {@link IndexWriterConfig#getIndexDeletionPolicy() config} should return
+ * {@link SnapshotDeletionPolicy}).
+ *
+ * When this revision is {@link #release() released}, it releases the obtained
+ * snapshot as well as calls {@link IndexWriter#deleteUnusedFiles()} so that the
+ * snapshotted files are deleted (if they are no longer needed).
+ *
+ * @lucene.experimental
+ */
+public class IndexRevision implements Revision {
+
+ private static final int RADIX = 16;
+ private static final String SOURCE = "index";
+
+ private final IndexWriter writer;
+ private final IndexCommit commit;
+ private final SnapshotDeletionPolicy sdp;
+ private final String version;
+ private final Map> sourceFiles;
+
+ // returns a RevisionFile with some metadata
+ private static RevisionFile newRevisionFile(String file, Directory dir) throws IOException {
+ RevisionFile revFile = new RevisionFile(file);
+ revFile.size = dir.fileLength(file);
+ return revFile;
+ }
+
+ /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
+ public static Map> revisionFiles(IndexCommit commit) throws IOException {
+ Collection commitFiles = commit.getFileNames();
+ List revisionFiles = new ArrayList(commitFiles.size());
+ String segmentsFile = commit.getSegmentsFileName();
+ Directory dir = commit.getDirectory();
+
+ for (String file : commitFiles) {
+ if (!file.equals(segmentsFile)) {
+ revisionFiles.add(newRevisionFile(file, dir));
+ }
+ }
+ revisionFiles.add(newRevisionFile(segmentsFile, dir)); // segments_N must be last
+ return Collections.singletonMap(SOURCE, revisionFiles);
+ }
+
+ /**
+ * Returns a String representation of a revision's version from the given
+ * {@link IndexCommit}.
+ */
+ public static String revisionVersion(IndexCommit commit) {
+ return Long.toString(commit.getGeneration(), RADIX);
+ }
+
+ /**
+ * Constructor over the given {@link IndexWriter}. Uses the last
+ * {@link IndexCommit} found in the {@link Directory} managed by the given
+ * writer.
+ */
+ public IndexRevision(IndexWriter writer) throws IOException {
+ IndexDeletionPolicy delPolicy = writer.getConfig().getIndexDeletionPolicy();
+ if (!(delPolicy instanceof SnapshotDeletionPolicy)) {
+ throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy");
+ }
+ this.writer = writer;
+ this.sdp = (SnapshotDeletionPolicy) delPolicy;
+ this.commit = sdp.snapshot();
+ this.version = revisionVersion(commit);
+ this.sourceFiles = revisionFiles(commit);
+ }
+
+ @Override
+ public int compareTo(String version) {
+ long gen = Long.parseLong(version, RADIX);
+ long commitGen = commit.getGeneration();
+ return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0);
+ }
+
+ @Override
+ public int compareTo(Revision o) {
+ IndexRevision other = (IndexRevision) o;
+ return commit.compareTo(other.commit);
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public Map> getSourceFiles() {
+ return sourceFiles;
+ }
+
+ @Override
+ public InputStream open(String source, String fileName) throws IOException {
+ assert source.equals(SOURCE) : "invalid source; expected=" + SOURCE + " got=" + source;
+ return new IndexInputInputStream(commit.getDirectory().openInput(fileName, IOContext.READONCE));
+ }
+
+ @Override
+ public void release() throws IOException {
+ sdp.release(commit);
+ writer.deleteUnusedFiles();
+ }
+
+ @Override
+ public String toString() {
+ return "IndexRevision version=" + version + " files=" + sourceFiles;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java b/lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java
new file mode 100755
index 00000000000..4ab746c11f0
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/LocalReplicator.java
@@ -0,0 +1,247 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * A {@link Replicator} implementation for use by the side that publishes
+ * {@link Revision}s, as well for clients to {@link #checkForUpdate(String)
+ * check for updates}. When a client needs to be updated, it is returned a
+ * {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. As long as a revision is being replicated, this replicator
+ * guarantees that it will not be {@link Revision#release() released}.
+ *
+ * Replication sessions expire by default after
+ * {@link #DEFAULT_SESSION_EXPIRATION_THRESHOLD}, and the threshold can be
+ * configured through {@link #setExpirationThreshold(long)}.
+ *
+ * @lucene.experimental
+ */
+public class LocalReplicator implements Replicator {
+
+ private static class RefCountedRevision {
+ private final AtomicInteger refCount = new AtomicInteger(1);
+ public final Revision revision;
+
+ public RefCountedRevision(Revision revision) {
+ this.revision = revision;
+ }
+
+ public void decRef() throws IOException {
+ if (refCount.get() <= 0) {
+ throw new IllegalStateException("this revision is already released");
+ }
+
+ final int rc = refCount.decrementAndGet();
+ if (rc == 0) {
+ boolean success = false;
+ try {
+ revision.release();
+ success = true;
+ } finally {
+ if (!success) {
+ // Put reference back on failure
+ refCount.incrementAndGet();
+ }
+ }
+ } else if (rc < 0) {
+ throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement");
+ }
+ }
+
+ public void incRef() {
+ refCount.incrementAndGet();
+ }
+
+ }
+
+ private static class ReplicationSession {
+ public final SessionToken session;
+ public final RefCountedRevision revision;
+ private volatile long lastAccessTime;
+
+ ReplicationSession(SessionToken session, RefCountedRevision revision) {
+ this.session = session;
+ this.revision = revision;
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ boolean isExpired(long expirationThreshold) {
+ return lastAccessTime < (System.currentTimeMillis() - expirationThreshold);
+ }
+
+ void markAccessed() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+ }
+
+ /** Threshold for expiring inactive sessions. Defaults to 30 minutes. */
+ public static final long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30;
+
+ private long expirationThresholdMilllis = LocalReplicator.DEFAULT_SESSION_EXPIRATION_THRESHOLD;
+
+ private volatile RefCountedRevision currentRevision;
+ private volatile boolean closed = false;
+
+ private final AtomicInteger sessionToken = new AtomicInteger(0);
+ private final Map sessions = new HashMap();
+
+ private void checkExpiredSessions() throws IOException {
+ // make a "to-delete" list so we don't risk deleting from the map while iterating it
+ final ArrayList toExpire = new ArrayList();
+ for (ReplicationSession token : sessions.values()) {
+ if (token.isExpired(expirationThresholdMilllis)) {
+ toExpire.add(token);
+ }
+ }
+ for (ReplicationSession token : toExpire) {
+ releaseSession(token.session.id);
+ }
+ }
+
+ private void releaseSession(String sessionID) throws IOException {
+ ReplicationSession session = sessions.remove(sessionID);
+ // if we're called concurrently by close() and release(), could be that one
+ // thread beats the other to release the session.
+ if (session != null) {
+ session.revision.decRef();
+ }
+ }
+
+ /** Ensure that replicator is still open, or throw {@link AlreadyClosedException} otherwise. */
+ protected final synchronized void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("This replicator has already been closed");
+ }
+ }
+
+ @Override
+ public synchronized SessionToken checkForUpdate(String currentVersion) {
+ ensureOpen();
+ if (currentRevision == null) { // no published revisions yet
+ return null;
+ }
+
+ if (currentVersion != null && currentRevision.revision.compareTo(currentVersion) <= 0) {
+ // currentVersion is newer or equal to latest published revision
+ return null;
+ }
+
+ // currentVersion is either null or older than latest published revision
+ currentRevision.incRef();
+ final String sessionID = Integer.toString(sessionToken.incrementAndGet());
+ final SessionToken sessionToken = new SessionToken(sessionID, currentRevision.revision);
+ final ReplicationSession timedSessionToken = new ReplicationSession(sessionToken, currentRevision);
+ sessions.put(sessionID, timedSessionToken);
+ return sessionToken;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ // release all managed revisions
+ for (ReplicationSession session : sessions.values()) {
+ session.revision.decRef();
+ }
+ sessions.clear();
+ closed = true;
+ }
+ }
+
+ /**
+ * Returns the expiration threshold.
+ *
+ * @see #setExpirationThreshold(long)
+ */
+ public long getExpirationThreshold() {
+ return expirationThresholdMilllis;
+ }
+
+ @Override
+ public synchronized InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ ensureOpen();
+ ReplicationSession session = sessions.get(sessionID);
+ if (session != null && session.isExpired(expirationThresholdMilllis)) {
+ releaseSession(sessionID);
+ session = null;
+ }
+ // session either previously expired, or we just expired it
+ if (session == null) {
+ throw new SessionExpiredException("session (" + sessionID + ") expired while obtaining file: source=" + source
+ + " file=" + fileName);
+ }
+ sessions.get(sessionID).markAccessed();
+ return session.revision.revision.open(source, fileName);
+ }
+
+ @Override
+ public synchronized void publish(Revision revision) throws IOException {
+ ensureOpen();
+ if (currentRevision != null) {
+ int compare = revision.compareTo(currentRevision.revision);
+ if (compare == 0) {
+ // same revision published again, ignore but release it
+ revision.release();
+ return;
+ }
+
+ if (compare < 0) {
+ revision.release();
+ throw new IllegalArgumentException("Cannot publish an older revision: rev=" + revision + " current="
+ + currentRevision);
+ }
+ }
+
+ // swap revisions
+ final RefCountedRevision oldRevision = currentRevision;
+ currentRevision = new RefCountedRevision(revision);
+ if (oldRevision != null) {
+ oldRevision.decRef();
+ }
+
+ // check for expired sessions
+ checkExpiredSessions();
+ }
+
+ @Override
+ public synchronized void release(String sessionID) throws IOException {
+ ensureOpen();
+ releaseSession(sessionID);
+ }
+
+ /**
+ * Modify session expiration time - if a replication session is inactive that
+ * long it is automatically expired, and further attempts to operate within
+ * this session will throw a {@link SessionExpiredException}.
+ */
+ public synchronized void setExpirationThreshold(long expirationThreshold) throws IOException {
+ ensureOpen();
+ this.expirationThresholdMilllis = expirationThreshold;
+ checkExpiredSessions();
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java b/lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java
new file mode 100755
index 00000000000..3dcd1b3fbc2
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/PerSessionDirectoryFactory.java
@@ -0,0 +1,77 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+
+/**
+ * A {@link SourceDirectoryFactory} which returns {@link FSDirectory} under a
+ * dedicated session directory. When a session is over, the entire directory is
+ * deleted.
+ *
+ * @lucene.experimental
+ */
+public class PerSessionDirectoryFactory implements SourceDirectoryFactory {
+
+ private final File workDir;
+
+ /** Constructor with the given sources mapping. */
+ public PerSessionDirectoryFactory(File workDir) {
+ this.workDir = workDir;
+ }
+
+ private void rm(File file) throws IOException {
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ rm(f);
+ }
+ }
+
+ // This should be either an empty directory, or a file
+ if (!file.delete() && file.exists()) {
+ throw new IOException("failed to delete " + file);
+ }
+ }
+
+ @Override
+ public Directory getDirectory(String sessionID, String source) throws IOException {
+ File sessionDir = new File(workDir, sessionID);
+ if (!sessionDir.exists() && !sessionDir.mkdirs()) {
+ throw new IOException("failed to create session directory " + sessionDir);
+ }
+ File sourceDir = new File(sessionDir, source);
+ if (!sourceDir.mkdirs()) {
+ throw new IOException("failed to create source directory " + sourceDir);
+ }
+ return FSDirectory.open(sourceDir);
+ }
+
+ @Override
+ public void cleanupSession(String sessionID) throws IOException {
+ if (sessionID.isEmpty()) { // protect against deleting workDir entirely!
+ throw new IllegalArgumentException("sessionID cannot be empty");
+ }
+ rm(new File(workDir, sessionID));
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java b/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
new file mode 100755
index 00000000000..93973514982
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
@@ -0,0 +1,416 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A client which monitors and obtains new revisions from a {@link Replicator}.
+ * It can be used to either periodically check for updates by invoking
+ * {@link #startUpdateThread}, or manually by calling {@link #updateNow()}.
+ *
+ * Whenever a new revision is available, the {@link #requiredFiles(Map)} are
+ * copied to the {@link Directory} specified by {@link PerSessionDirectoryFactory} and
+ * a handler is notified.
+ *
+ * @lucene.experimental
+ */
+public class ReplicationClient implements Closeable {
+
+ private class ReplicationThread extends Thread {
+
+ private final long interval;
+
+ // client uses this to stop us
+ final CountDownLatch stop = new CountDownLatch(1);
+
+ public ReplicationThread(long interval) {
+ this.interval = interval;
+ }
+
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ while (true) {
+ long time = System.currentTimeMillis();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } catch (Throwable t) {
+ handleUpdateException(t);
+ } finally {
+ updateLock.unlock();
+ }
+ time = System.currentTimeMillis() - time;
+
+ // adjust timeout to compensate the time spent doing the replication.
+ final long timeout = interval - time;
+ if (timeout > 0) {
+ try {
+ // this will return immediately if we were ordered to stop (count=0)
+ // or the timeout has elapsed. if it returns true, it means count=0,
+ // so terminate.
+ if (stop.await(timeout, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ // if we were interruted, somebody wants to terminate us, so just
+ // throw the exception further.
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ }
+ }
+ }
+
+ }
+
+ /** Handler for revisions obtained by the client. */
+ public static interface ReplicationHandler {
+
+ /** Returns the current revision files held by the handler. */
+ public Map> currentRevisionFiles();
+
+ /** Returns the current revision version held by the handler. */
+ public String currentVersion();
+
+ /**
+ * Called when a new revision was obtained and is available (i.e. all needed
+ * files were successfully copied).
+ *
+ * @param version
+ * the version of the {@link Revision} that was copied
+ * @param revisionFiles
+ * the files contained by this {@link Revision}
+ * @param copiedFiles
+ * the files that were actually copied
+ * @param sourceDirectory
+ * a mapping from a source of files to the {@link Directory} they
+ * were copied into
+ */
+ public void revisionReady(String version, Map> revisionFiles,
+ Map> copiedFiles, Map sourceDirectory) throws IOException;
+ }
+
+ /**
+ * Resolves a session and source into a {@link Directory} to use for copying
+ * the session files to.
+ */
+ public static interface SourceDirectoryFactory {
+
+ /**
+ * Called to denote that the replication actions for this session were finished and the directory is no longer needed.
+ */
+ public void cleanupSession(String sessionID) throws IOException;
+
+ /**
+ * Returns the {@link Directory} to use for the given session and source.
+ * Implementations may e.g. return different directories for different
+ * sessions, or the same directory for all sessions. In that case, it is
+ * advised to clean the directory before it is used for a new session.
+ *
+ * @see #cleanupSession(String)
+ */
+ public Directory getDirectory(String sessionID, String source) throws IOException;
+
+ }
+
+ /** The component name to use with {@link InfoStream#isEnabled(String)}. */
+ public static final String INFO_STREAM_COMPONENT = "ReplicationThread";
+
+ private final Replicator replicator;
+ private final ReplicationHandler handler;
+ private final SourceDirectoryFactory factory;
+ private final byte[] copyBuffer = new byte[16384];
+ private final Lock updateLock = new ReentrantLock();
+
+ private volatile ReplicationThread updateThread;
+ private volatile boolean closed = false;
+ private volatile InfoStream infoStream = InfoStream.getDefault();
+
+ /**
+ * Constructor.
+ *
+ * @param replicator the {@link Replicator} used for checking for updates
+ * @param handler notified when new revisions are ready
+ * @param factory returns a {@link Directory} for a given source and session
+ */
+ public ReplicationClient(Replicator replicator, ReplicationHandler handler, SourceDirectoryFactory factory) {
+ this.replicator = replicator;
+ this.handler = handler;
+ this.factory = factory;
+ }
+
+ private void copyBytes(IndexOutput out, InputStream in) throws IOException {
+ int numBytes;
+ while ((numBytes = in.read(copyBuffer)) > 0) {
+ out.writeBytes(copyBuffer, 0, numBytes);
+ }
+ }
+
+ private void doUpdate() throws IOException {
+ SessionToken session = null;
+ final Map sourceDirectory = new HashMap();
+ final Map> copiedFiles = new HashMap>();
+ boolean notify = false;
+ try {
+ final String version = handler.currentVersion();
+ session = replicator.checkForUpdate(version);
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): handlerVersion=" + version + " session=" + session);
+ }
+ if (session == null) {
+ // already up to date
+ return;
+ }
+ Map> requiredFiles = requiredFiles(session.sourceFiles);
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): requiredFiles=" + requiredFiles);
+ }
+ for (Entry> e : requiredFiles.entrySet()) {
+ String source = e.getKey();
+ Directory dir = factory.getDirectory(session.id, source);
+ sourceDirectory.put(source, dir);
+ List cpFiles = new ArrayList();
+ copiedFiles.put(source, cpFiles);
+ for (RevisionFile file : e.getValue()) {
+ if (closed) {
+ // if we're closed, abort file copy
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): detected client was closed); abort file copy");
+ }
+ return;
+ }
+ InputStream in = null;
+ IndexOutput out = null;
+ try {
+ in = replicator.obtainFile(session.id, source, file.fileName);
+ out = dir.createOutput(file.fileName, IOContext.DEFAULT);
+ copyBytes(out, in);
+ cpFiles.add(file.fileName);
+ // TODO add some validation, on size / checksum
+ } finally {
+ IOUtils.close(in, out);
+ }
+ }
+ }
+ // only notify if all required files were successfully obtained.
+ notify = true;
+ } finally {
+ if (session != null) {
+ try {
+ replicator.release(session.id);
+ } finally {
+ if (!notify) { // cleanup after ourselves
+ IOUtils.close(sourceDirectory.values());
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+ }
+
+ // notify outside the try-finally above, so the session is released sooner.
+ // the handler may take time to finish acting on the copied files, but the
+ // session itself is no longer needed.
+ try {
+ if (notify && !closed ) { // no use to notify if we are closed already
+ handler.revisionReady(session.version, session.sourceFiles, copiedFiles, sourceDirectory);
+ }
+ } finally {
+ IOUtils.close(sourceDirectory.values());
+ if (session != null) {
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+
+ /** Throws {@link AlreadyClosedException} if the client has already been closed. */
+ protected final void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("this update client has already been closed");
+ }
+ }
+
+ /**
+ * Called when an exception is hit by the replication thread. The default
+ * implementation prints the full stacktrace to the {@link InfoStream} set in
+ * {@link #setInfoStream(InfoStream)}, or the {@link InfoStream#getDefault()
+ * default} one. You can override to log the exception elswhere.
+ *
+ * NOTE: if you override this method to throw the exception further,
+ * the replication thread will be terminated. The only way to restart it is to
+ * call {@link #stopUpdateThread()} followed by
+ * {@link #startUpdateThread(long, String)}.
+ */
+ protected void handleUpdateException(Throwable t) {
+ final StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "an error occurred during revision update: " + sw.toString());
+ }
+ }
+
+ /**
+ * Returns the files required for replication. By default, this method returns
+ * all files that exist in the new revision, but not in the handler.
+ */
+ protected Map> requiredFiles(Map> newRevisionFiles) {
+ Map> handlerRevisionFiles = handler.currentRevisionFiles();
+ if (handlerRevisionFiles == null) {
+ return newRevisionFiles;
+ }
+
+ Map> requiredFiles = new HashMap>();
+ for (Entry> e : handlerRevisionFiles.entrySet()) {
+ // put the handler files in a Set, for faster contains() checks later
+ Set handlerFiles = new HashSet();
+ for (RevisionFile file : e.getValue()) {
+ handlerFiles.add(file.fileName);
+ }
+
+ // make sure to preserve revisionFiles order
+ ArrayList res = new ArrayList();
+ String source = e.getKey();
+ assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles;
+ for (RevisionFile file : newRevisionFiles.get(source)) {
+ if (!handlerFiles.contains(file.fileName)) {
+ res.add(file);
+ }
+ }
+ requiredFiles.put(source, res);
+ }
+
+ return requiredFiles;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ stopUpdateThread();
+ closed = true;
+ }
+ }
+
+ /**
+ * Start the update thread with the specified interval in milliseconds. For
+ * debugging purposes, you can optionally set the name to set on
+ * {@link Thread#setName(String)}. If you pass {@code null}, a default name
+ * will be set.
+ *
+ * @throws IllegalStateException if the thread has already been started
+ */
+ public synchronized void startUpdateThread(long intervalMillis, String threadName) {
+ ensureOpen();
+ if (updateThread != null && updateThread.isAlive()) {
+ throw new IllegalStateException(
+ "cannot start an update thread when one is running, must first call 'stopUpdateThread()'");
+ }
+ threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName;
+ updateThread = new ReplicationThread(intervalMillis);
+ updateThread.setName(threadName);
+ updateThread.start();
+ // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side
+ assert updateThread.isAlive() : "updateThread started but not alive?";
+ }
+
+ /**
+ * Stop the update thread. If the update thread is not running, silently does
+ * nothing. This method returns after the update thread has stopped.
+ */
+ public synchronized void stopUpdateThread() {
+ if (updateThread != null) {
+ // this will trigger the thread to terminate if it awaits the lock.
+ // otherwise, if it's in the middle of replication, we wait for it to
+ // stop.
+ updateThread.stop.countDown();
+ try {
+ updateThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ updateThread = null;
+ }
+ }
+
+ /**
+ * Returns true if the update thread is alive. The update thread is alive if
+ * it has been {@link #startUpdateThread(long, String) started} and not
+ * {@link #stopUpdateThread() stopped}, as well as didn't hit an error which
+ * caused it to terminate (i.e. {@link #handleUpdateException(Throwable)}
+ * threw the exception further).
+ */
+ public synchronized boolean isUpdateThreadAlive() {
+ return updateThread != null && updateThread.isAlive();
+ }
+
+ @Override
+ public String toString() {
+ String res = "ReplicationClient";
+ if (updateThread != null) {
+ res += " (" + updateThread.getName() + ")";
+ }
+ return res;
+ }
+
+ /**
+ * Executes the update operation immediately, irregardess if an update thread
+ * is running or not.
+ */
+ public void updateNow() throws IOException {
+ ensureOpen();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ /** Sets the {@link InfoStream} to use for logging messages. */
+ public void setInfoStream(InfoStream infoStream) {
+ if (infoStream == null) {
+ infoStream = InfoStream.NO_OUTPUT;
+ }
+ this.infoStream = infoStream;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java b/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
new file mode 100755
index 00000000000..3e4d4e5f42a
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface for replicating files. Allows a producer to
+ * {@link #publish(Revision) publish} {@link Revision}s and consumers to
+ * {@link #checkForUpdate(String) check for updates}. When a client needs to be
+ * updated, it is given a {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. After the client has finished obtaining all the files, it should
+ * {@link #release(String) release} the given session, so that the files can be
+ * reclaimed if they are not needed anymore.
+ *
+ * A client is always updated to the newest revision available. That is, if a
+ * client is on revision r1 and revisions r2 and r3
+ * were published, then when the cllient will next check for update, it will
+ * receive r3.
+ *
+ * @lucene.experimental
+ */
+public interface Replicator extends Closeable {
+
+ /**
+ * Publish a new {@link Revision} for consumption by clients. It is the
+ * caller's responsibility to verify that the revision files exist and can be
+ * read by clients. When the revision is no longer needed, it will be
+ * {@link Revision#release() released} by the replicator.
+ */
+ public void publish(Revision revision) throws IOException;
+
+ /**
+ * Check whether the given version is up-to-date and returns a
+ * {@link SessionToken} which can be used for fetching the revision files,
+ * otherwise returns {@code null}.
+ *
+ * NOTE: when the returned session token is no longer needed, you
+ * should call {@link #release(String)} so that the session resources can be
+ * reclaimed, including the revision files.
+ */
+ public SessionToken checkForUpdate(String currVersion) throws IOException;
+
+ /**
+ * Notify that the specified {@link SessionToken} is no longer needed by the
+ * caller.
+ */
+ public void release(String sessionID) throws IOException;
+
+ /**
+ * Returns an {@link InputStream} for the requested file and source in the
+ * context of the given {@link SessionToken#id session}.
+ *
+ * NOTE: it is the caller's responsibility to close the returned
+ * stream.
+ *
+ * @throws SessionExpiredException if the specified session has already
+ * expired
+ */
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException;
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java b/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
new file mode 100755
index 00000000000..9df22fbe91c
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
@@ -0,0 +1,75 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * A revision comprises lists of files that come from different sources and need
+ * to be replicated together to e.g. guarantee that all resources are in sync.
+ * In most cases an application will replicate a single index, and so the
+ * revision will contain files from a single source. However, some applications
+ * may require to treat a collection of indexes as a single entity so that the
+ * files from all sources are replicated together, to guarantee consistency
+ * beween them. For example, an application which indexes facets will need to
+ * replicate both the search and taxonomy indexes together, to guarantee that
+ * they match at the client side.
+ *
+ * @lucene.experimental
+ */
+public interface Revision extends Comparable {
+
+ /**
+ * Compares the revision to the given version string. Behaves like
+ * {@link Comparable#compareTo(Object)}.
+ */
+ public int compareTo(String version);
+
+ /**
+ * Returns a string representation of the version of this revision. The
+ * version is used by {@link #compareTo(String)} as well as to
+ * serialize/deserialize revision information. Therefore it must be self
+ * descriptive as well as be able to identify one revision from another.
+ */
+ public String getVersion();
+
+ /**
+ * Returns the files that comprise this revision, as a mapping from a source
+ * to a list of files.
+ */
+ public Map> getSourceFiles();
+
+ /**
+ * Returns an {@link IndexInput} for the given fileName and source. It is the
+ * caller's respnsibility to close the {@link IndexInput} when it has been
+ * consumed.
+ */
+ public InputStream open(String source, String fileName) throws IOException;
+
+ /**
+ * Called when this revision can be safely released, i.e. where there are no
+ * more references to it.
+ */
+ public void release() throws IOException;
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java b/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
new file mode 100755
index 00000000000..3fc8cffcb73
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
@@ -0,0 +1,59 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Describes a file in a {@link Revision}. A file has a source, which allows a
+ * single revision to contain files from multiple sources (e.g. multiple
+ * indexes).
+ *
+ * @lucene.experimental
+ */
+public class RevisionFile {
+
+ /** The name of the file. */
+ public final String fileName;
+
+ /** The size of the file denoted by {@link #fileName}. */
+ public long size = -1;
+
+ /** Constructor with the given file name. */
+ public RevisionFile(String fileName) {
+ if (fileName == null || fileName.isEmpty()) {
+ throw new IllegalArgumentException("fileName cannot be null or empty");
+ }
+ this.fileName = fileName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ RevisionFile other = (RevisionFile) obj;
+ return fileName.equals(other.fileName) && size == other.size;
+ }
+
+ @Override
+ public int hashCode() {
+ return fileName.hashCode() ^ (int) (size ^ (size >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "fileName=" + fileName + " size=" + size;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
new file mode 100755
index 00000000000..4b697c3c3ce
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
@@ -0,0 +1,54 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a revision update session was expired due to lack
+ * of activity.
+ *
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ * @see LocalReplicator#setExpirationThreshold(long)
+ *
+ * @lucene.experimental
+ */
+public class SessionExpiredException extends IOException {
+
+ /**
+ * @see IOException#IOException(String, Throwable)
+ */
+ public SessionExpiredException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @see IOException#IOException(String)
+ */
+ public SessionExpiredException(String message) {
+ super(message);
+ }
+
+ /**
+ * @see IOException#IOException(Throwable)
+ */
+ public SessionExpiredException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java b/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
new file mode 100755
index 00000000000..90b6e41f9a9
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
@@ -0,0 +1,108 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Token for a replication session, for guaranteeing that source replicated
+ * files will be kept safe until the replication completes.
+ *
+ * @see Replicator#checkForUpdate(String)
+ * @see Replicator#release(String)
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ *
+ * @lucene.experimental
+ */
+public final class SessionToken {
+
+ /**
+ * ID of this session.
+ * Should be passed when releasing the session, thereby acknowledging the
+ * {@link Replicator Replicator} that this session is no longer in use.
+ * @see Replicator#release(String)
+ */
+ public final String id;
+
+ /**
+ * @see Revision#getVersion()
+ */
+ public final String version;
+
+ /**
+ * @see Revision#getSourceFiles()
+ */
+ public final Map> sourceFiles;
+
+ /** Constructor which deserializes from the given {@link DataInput}. */
+ public SessionToken(DataInput in) throws IOException {
+ this.id = in.readUTF();
+ this.version = in.readUTF();
+ this.sourceFiles = new HashMap>();
+ int numSources = in.readInt();
+ while (numSources > 0) {
+ String source = in.readUTF();
+ int numFiles = in.readInt();
+ List files = new ArrayList(numFiles);
+ for (int i = 0; i < numFiles; i++) {
+ String fileName = in.readUTF();
+ RevisionFile file = new RevisionFile(fileName);
+ file.size = in.readLong();
+ files.add(file);
+ }
+ this.sourceFiles.put(source, files);
+ --numSources;
+ }
+ }
+
+ /** Constructor with the given id and revision. */
+ public SessionToken(String id, Revision revision) {
+ this.id = id;
+ this.version = revision.getVersion();
+ this.sourceFiles = revision.getSourceFiles();
+ }
+
+ /** Serialize the token data for communication between server and client. */
+ public void serialize(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(version);
+ out.writeInt(sourceFiles.size());
+ for (Entry> e : sourceFiles.entrySet()) {
+ out.writeUTF(e.getKey());
+ List files = e.getValue();
+ out.writeInt(files.size());
+ for (RevisionFile file : files) {
+ out.writeUTF(file.fileName);
+ out.writeLong(file.size);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "id=" + id + " version=" + version + " files=" + sourceFiles;
+ }
+
+}
\ No newline at end of file
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java b/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
new file mode 100755
index 00000000000..ebe75ad035c
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
@@ -0,0 +1,297 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.util.EntityUtils;
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * Base class for Http clients.
+ *
+ * @lucene.experimental
+ * */
+public abstract class HttpClientBase implements Closeable {
+
+ /**
+ * Default connection timeout for this client, in milliseconds.
+ *
+ * @see #setConnectionTimeout(int)
+ */
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+
+ /**
+ * Default socket timeout for this client, in milliseconds.
+ *
+ * @see #setSoTimeout(int)
+ */
+ public static final int DEFAULT_SO_TIMEOUT = 60000;
+
+ // TODO compression?
+
+ /** The URL stting to execute requests against. */
+ protected final String url;
+
+ private volatile boolean closed = false;
+
+ private final HttpClient httpc;
+
+ /**
+ * @param conMgr connection manager to use for this http client.
+ * NOTE:The provided {@link ClientConnectionManager} will not be
+ * {@link ClientConnectionManager#shutdown()} by this class.
+ */
+ protected HttpClientBase(String host, int port, String path, ClientConnectionManager conMgr) {
+ url = normalizedURL(host, port, path);
+ httpc = new DefaultHttpClient(conMgr);
+ setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+ setSoTimeout(DEFAULT_SO_TIMEOUT);
+ }
+
+ /**
+ * Set the connection timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setConnectionTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setConnectionTimeout(int timeout) {
+ HttpConnectionParams.setConnectionTimeout(httpc.getParams(), timeout);
+ }
+
+ /**
+ * Set the socket timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setSoTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setSoTimeout(int timeout) {
+ HttpConnectionParams.setSoTimeout(httpc.getParams(), timeout);
+ }
+
+ /** Throws {@link AlreadyClosedException} if this client is already closed. */
+ protected final void ensureOpen() throws AlreadyClosedException {
+ if (closed) {
+ throw new AlreadyClosedException("HttpClient already closed");
+ }
+ }
+
+ /**
+ * Create a URL out of the given parameters, translate an empty/null path to '/'
+ */
+ private static String normalizedURL(String host, int port, String path) {
+ if (path == null || path.length() == 0) {
+ path = "/";
+ }
+ return "http://" + host + ":" + port + path;
+ }
+
+ /**
+ * Internal: response status after invocation, and in case or error attempt to read the
+ * exception sent by the server.
+ */
+ protected void verifyStatus(HttpResponse response) throws IOException {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != HttpStatus.SC_OK) {
+ throwKnownError(response, statusLine);
+ }
+ }
+
+ protected void throwKnownError(HttpResponse response, StatusLine statusLine) throws IOException {
+ ObjectInputStream in = null;
+ try {
+ in = new ObjectInputStream(response.getEntity().getContent());
+ } catch (Exception e) {
+ // the response stream is not an exception - could be an error in servlet.init().
+ throw new RuntimeException("Uknown error: " + statusLine);
+ }
+
+ Throwable t;
+ try {
+ t = (Throwable) in.readObject();
+ } catch (Exception e) {
+ //not likely
+ throw new RuntimeException("Failed to read exception object: " + statusLine, e);
+ } finally {
+ in.close();
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException("unknown exception "+statusLine,t);
+ }
+
+ /**
+ * internal: execute a request and return its result
+ * The params argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executePOST(String request, HttpEntity entity, String... params) throws IOException {
+ ensureOpen();
+ HttpPost m = new HttpPost(queryString(request, params));
+ m.setEntity(entity);
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ /**
+ * internal: execute a request and return its result
+ * The params argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executeGET(String request, String... params) throws IOException {
+ ensureOpen();
+ HttpGet m = new HttpGet(queryString(request, params));
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ private String queryString(String request, String... params) throws UnsupportedEncodingException {
+ StringBuilder query = new StringBuilder(url).append('/').append(request).append('?');
+ if (params != null) {
+ for (int i = 0; i < params.length; i += 2) {
+ query.append(params[i]).append('=').append(URLEncoder.encode(params[i+1], "UTF8")).append('&');
+ }
+ }
+ return query.substring(0, query.length() - 1);
+ }
+
+ /** Internal utility: input stream of the provided response */
+ public InputStream responseInputStream(HttpResponse response) throws IOException {
+ return responseInputStream(response, false);
+ }
+
+ // TODO: can we simplify this Consuming !?!?!?
+ /**
+ * Internal utility: input stream of the provided response, which optionally
+ * consumes the response's resources when the input stream is exhausted.
+ */
+ public InputStream responseInputStream(HttpResponse response, boolean consume) throws IOException {
+ final HttpEntity entity = response.getEntity();
+ final InputStream in = entity.getContent();
+ if (!consume) {
+ return in;
+ }
+ return new InputStream() {
+ private boolean consumed = false;
+ @Override
+ public int read() throws IOException {
+ final int res = in.read();
+ consume(res);
+ return res;
+ }
+ @Override
+ public void close() throws IOException {
+ super.close();
+ consume(-1);
+ }
+ @Override
+ public int read(byte[] b) throws IOException {
+ final int res = super.read(b);
+ consume(res);
+ return res;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int res = super.read(b, off, len);
+ consume(res);
+ return res;
+ }
+ private void consume(int minusOne) {
+ if (!consumed && minusOne==-1) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (Exception e) {
+ // ignored on purpose
+ }
+ consumed = true;
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns true iff this instance was {@link #close() closed}, otherwise
+ * returns false. Note that if you override {@link #close()}, you must call
+ * {@code super.close()}, in order for this instance to be properly closed.
+ */
+ protected final boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Same as {@link #doAction(HttpResponse, boolean, Callable)} but always do consume at the end.
+ */
+ protected T doAction(HttpResponse response, Callable call) throws IOException {
+ return doAction(response, true, call);
+ }
+
+ /**
+ * Do a specific action and validate after the action that the status is still OK,
+ * and if not, attempt to extract the actual server side exception. Optionally
+ * release the response at exit, depending on consume parameter.
+ */
+ protected T doAction(HttpResponse response, boolean consume, Callable call) throws IOException {
+ IOException error = null;
+ try {
+ return call.call();
+ } catch (IOException e) {
+ error = e;
+ } catch (Exception e) {
+ error = new IOException(e);
+ } finally {
+ try {
+ verifyStatus(response);
+ } finally {
+ if (consume) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ } catch (Exception e) {
+ // ignoring on purpose
+ }
+ }
+ }
+ }
+ throw error; // should not get here
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+}
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java b/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java
new file mode 100755
index 00000000000..7df96977457
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java
@@ -0,0 +1,105 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.Revision;
+import org.apache.lucene.replicator.SessionToken;
+import org.apache.lucene.replicator.http.ReplicationService.ReplicationAction;
+
+/**
+ * An HTTP implementation of {@link Replicator}. Assumes the API supported by
+ * {@link ReplicationService}.
+ *
+ * @lucene.experimental
+ */
+public class HttpReplicator extends HttpClientBase implements Replicator {
+
+ /** Construct with specified connection manager. */
+ public HttpReplicator(String host, int port, String path, ClientConnectionManager conMgr) {
+ super(host, port, path, conMgr);
+ }
+
+ @Override
+ public SessionToken checkForUpdate(String currVersion) throws IOException {
+ String[] params = null;
+ if (currVersion != null) {
+ params = new String[] { ReplicationService.REPLICATE_VERSION_PARAM, currVersion };
+ }
+ final HttpResponse response = executeGET(ReplicationAction.UPDATE.name(), params);
+ return doAction(response, new Callable() {
+ @Override
+ public SessionToken call() throws Exception {
+ final DataInputStream dis = new DataInputStream(responseInputStream(response));
+ try {
+ if (dis.readByte() == 0) {
+ return null;
+ } else {
+ return new SessionToken(dis);
+ }
+ } finally {
+ dis.close();
+ }
+ }
+ });
+ }
+
+ @Override
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID,
+ ReplicationService.REPLICATE_SOURCE_PARAM, source,
+ ReplicationService.REPLICATE_FILENAME_PARAM, fileName,
+ };
+ final HttpResponse response = executeGET(ReplicationAction.OBTAIN.name(), params);
+ return doAction(response, false, new Callable() {
+ @Override
+ public InputStream call() throws Exception {
+ return responseInputStream(response,true);
+ }
+ });
+ }
+
+ @Override
+ public void publish(Revision revision) throws IOException {
+ throw new UnsupportedOperationException(
+ "this replicator implementation does not support remote publishing of revisions");
+ }
+
+ @Override
+ public void release(String sessionID) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID
+ };
+ final HttpResponse response = executeGET(ReplicationAction.RELEASE.name(), params);
+ doAction(response, new Callable