mirror of https://github.com/apache/lucene.git
LUCENE-7335: IW's commit data is now late binding
This commit is contained in:
parent
b85c5be6c4
commit
ea9c7ea88c
|
@ -15,6 +15,14 @@ New Features
|
|||
long "sequence number" indicating the effective equivalent
|
||||
single-threaded execution order (Mike McCandless)
|
||||
|
||||
* LUCENE-7335: IndexWriter's commit data is now late binding,
|
||||
recording key/values from a provided iterable based on when the
|
||||
commit actually takes place (Mike McCandless)
|
||||
|
||||
Bug Fixes
|
||||
|
||||
* LUCENE-6662: Fixed potential resource leaks. (Rishabh Patel via Adrien Grand)
|
||||
|
||||
Improvements
|
||||
|
||||
* LUCENE-7323: Compound file writing now verifies the incoming
|
||||
|
|
|
@ -51,7 +51,7 @@ public class CommitIndexTask extends PerfTask {
|
|||
IndexWriter iw = getRunData().getIndexWriter();
|
||||
if (iw != null) {
|
||||
if (commitUserData != null) {
|
||||
iw.setCommitData(commitUserData);
|
||||
iw.setLiveCommitData(commitUserData.entrySet());
|
||||
}
|
||||
iw.commit();
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ public abstract class IndexCommit implements Comparable<IndexCommit> {
|
|||
public abstract long getGeneration();
|
||||
|
||||
/** Returns userData, previously passed to {@link
|
||||
* IndexWriter#setCommitData(Map)} for this commit. Map is
|
||||
* IndexWriter#setLiveCommitData(Iterable)} for this commit. Map is
|
||||
* {@code String -> String}. */
|
||||
public abstract Map<String,String> getUserData() throws IOException;
|
||||
|
||||
|
|
|
@ -174,7 +174,7 @@ public final class IndexUpgrader {
|
|||
infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST);
|
||||
infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata...");
|
||||
}
|
||||
w.setCommitData(w.getCommitData()); // fake change to enforce a commit (e.g. if index has no segments)
|
||||
w.setLiveCommitData(w.getLiveCommitData()); // fake change to enforce a commit (e.g. if index has no segments)
|
||||
assert w.hasUncommittedChanges();
|
||||
w.commit();
|
||||
if (infoStream.isEnabled(LOG_PREFIX)) {
|
||||
|
|
|
@ -295,6 +295,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
private volatile boolean closed;
|
||||
private volatile boolean closing;
|
||||
|
||||
private Iterable<Map.Entry<String,String>> commitUserData;
|
||||
|
||||
// Holds all SegmentInfo instances currently involved in
|
||||
// merges
|
||||
private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
||||
|
@ -947,6 +949,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
rollbackSegments = segmentInfos.createBackupSegmentInfos();
|
||||
}
|
||||
|
||||
commitUserData = new HashMap<String,String>(segmentInfos.getUserData()).entrySet();
|
||||
|
||||
pendingNumDocs.set(segmentInfos.totalMaxDoc());
|
||||
|
||||
// start with previous field numbers, but new FieldInfos
|
||||
|
@ -2997,6 +3001,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
segmentInfos.changed();
|
||||
}
|
||||
|
||||
if (commitUserData != null) {
|
||||
Map<String,String> userData = new HashMap<>();
|
||||
for(Map.Entry<String,String> ent : commitUserData) {
|
||||
userData.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
segmentInfos.setUserData(userData, false);
|
||||
}
|
||||
|
||||
// Must clone the segmentInfos while we still
|
||||
// hold fullFlushLock and while sync'd so that
|
||||
// no partial changes (eg a delete w/o
|
||||
|
@ -3011,7 +3023,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
// we are trying to sync all referenced files, a
|
||||
// merge completes which would otherwise have
|
||||
// removed the files we are now syncing.
|
||||
filesToCommit = toCommit.files(false);
|
||||
filesToCommit = toCommit.files(false);
|
||||
deleter.incRef(filesToCommit);
|
||||
}
|
||||
success = true;
|
||||
|
@ -3059,38 +3071,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the commit user data map. That method is considered a transaction by
|
||||
* {@link IndexWriter} and will be {@link #commit() committed} even if no other
|
||||
* changes were made to the writer instance. Note that you must call this method
|
||||
* before {@link #prepareCommit()}, or otherwise it won't be included in the
|
||||
* Sets the iterator to provide the commit user data map at commit time. Calling this method
|
||||
* is considered a committable change and will be {@link #commit() committed} even if
|
||||
* there are no other changes this writer. Note that you must call this method
|
||||
* before {@link #prepareCommit()}. Otherwise it won't be included in the
|
||||
* follow-on {@link #commit()}.
|
||||
* <p>
|
||||
* <b>NOTE:</b> the map is cloned internally, therefore altering the map's
|
||||
* contents after calling this method has no effect.
|
||||
* <b>NOTE:</b> the iterator is late-binding: it is only visited once all documents for the
|
||||
* commit have been written to their segments, before the next segments_N file is written
|
||||
*/
|
||||
public final synchronized void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData) {
|
||||
setLiveCommitData(commitUserData, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the commit user data map.
|
||||
*
|
||||
* @deprecated Use {@link #setLiveCommitData} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public final synchronized void setCommitData(Map<String,String> commitUserData) {
|
||||
setCommitData(commitUserData, true);
|
||||
setLiveCommitData(new HashMap<>(commitUserData).entrySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the commit user data map, controlling whether to advance the {@link SegmentInfos#getVersion}.
|
||||
* Sets the commit user data iterator, controlling whether to advance the {@link SegmentInfos#getVersion}.
|
||||
*
|
||||
* @see #setCommitData(Map)
|
||||
* @see #setLiveCommitData(Iterable)
|
||||
*
|
||||
* @lucene.internal */
|
||||
public final synchronized void setCommitData(Map<String,String> commitUserData, boolean doIncrementVersion) {
|
||||
segmentInfos.setUserData(new HashMap<>(commitUserData), doIncrementVersion);
|
||||
public final synchronized void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData, boolean doIncrementVersion) {
|
||||
this.commitUserData = commitUserData;
|
||||
if (doIncrementVersion) {
|
||||
segmentInfos.changed();
|
||||
}
|
||||
changeCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the commit user data iterable previously set with {@link #setLiveCommitData(Iterable)}, or null if nothing has been set yet.
|
||||
*/
|
||||
public final synchronized Iterable<Map.Entry<String,String>> getLiveCommitData() {
|
||||
return commitUserData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the commit user data map that was last committed, or the one that
|
||||
* was set on {@link #setCommitData(Map)}.
|
||||
*
|
||||
* @deprecated Use {@link #getLiveCommitData} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public final synchronized Map<String,String> getCommitData() {
|
||||
return segmentInfos.getUserData();
|
||||
Map<String,String> data = new HashMap<>();
|
||||
for(Map.Entry<String,String> ent : commitUserData) {
|
||||
data.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
|
||||
// Used only by commit and prepareCommit, below; lock
|
||||
// order is commitLock -> IW
|
||||
private final Object commitLock = new Object();
|
||||
|
|
|
@ -17,6 +17,20 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
|
@ -32,20 +46,6 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.lucene.util.StringHelper;
|
||||
import org.apache.lucene.util.Version;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A collection of segmentInfo objects with methods for operating on those
|
||||
* segments in relation to the file system.
|
||||
|
@ -103,7 +103,7 @@ import java.util.Set;
|
|||
* <li>SegID is the identifier of the Codec that encoded this segment. </li>
|
||||
* <li>CommitUserData stores an optional user-supplied opaque
|
||||
* Map<String,String> that was passed to
|
||||
* {@link IndexWriter#setCommitData(java.util.Map)}.</li>
|
||||
* {@link IndexWriter#setLiveCommitData(Iterable)}.</li>
|
||||
* <li>FieldInfosGen is the generation count of the fieldInfos file. If this is
|
||||
* -1, there are no updates to the fieldInfos in that segment. Anything above
|
||||
* zero means there are updates to fieldInfos stored by {@link FieldInfosFormat}
|
||||
|
@ -453,7 +453,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||
String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
|
||||
"",
|
||||
nextGeneration);
|
||||
|
||||
|
||||
// Always advance the generation on write:
|
||||
generation = nextGeneration;
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
ExpirationTimeDeletionPolicy policy = (ExpirationTimeDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
|
||||
Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
||||
writer.setCommitData(commitData);
|
||||
writer.setLiveCommitData(commitData.entrySet());
|
||||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
|
@ -251,7 +251,7 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||
}
|
||||
commitData = new HashMap<>();
|
||||
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
||||
writer.setCommitData(commitData);
|
||||
writer.setLiveCommitData(commitData.entrySet());
|
||||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
|
|
|
@ -557,14 +557,14 @@ public class TestDirectoryReaderReopen extends LuceneTestCase {
|
|||
writer.addDocument(doc);
|
||||
Map<String,String> data = new HashMap<>();
|
||||
data.put("index", i+"");
|
||||
writer.setCommitData(data);
|
||||
writer.setLiveCommitData(data.entrySet());
|
||||
writer.commit();
|
||||
}
|
||||
for(int i=0;i<4;i++) {
|
||||
writer.deleteDocuments(new Term("id", ""+i));
|
||||
Map<String,String> data = new HashMap<>();
|
||||
data.put("index", (4+i)+"");
|
||||
writer.setCommitData(data);
|
||||
writer.setLiveCommitData(data.entrySet());
|
||||
writer.commit();
|
||||
}
|
||||
writer.close();
|
||||
|
|
|
@ -1892,9 +1892,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
writer.commit(); // first commit to complete IW create transaction.
|
||||
|
||||
// this should store the commit data, even though no other changes were made
|
||||
writer.setCommitData(new HashMap<String,String>() {{
|
||||
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||
put("key", "value");
|
||||
}});
|
||||
}}.entrySet());
|
||||
writer.commit();
|
||||
|
||||
DirectoryReader r = DirectoryReader.open(dir);
|
||||
|
@ -1902,13 +1902,13 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
r.close();
|
||||
|
||||
// now check setCommitData and prepareCommit/commit sequence
|
||||
writer.setCommitData(new HashMap<String,String>() {{
|
||||
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||
put("key", "value1");
|
||||
}});
|
||||
}}.entrySet());
|
||||
writer.prepareCommit();
|
||||
writer.setCommitData(new HashMap<String,String>() {{
|
||||
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||
put("key", "value2");
|
||||
}});
|
||||
}}.entrySet());
|
||||
writer.commit(); // should commit the first commitData only, per protocol
|
||||
|
||||
r = DirectoryReader.open(dir);
|
||||
|
@ -1926,21 +1926,32 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private Map<String,String> getLiveCommitData(IndexWriter writer) {
|
||||
Map<String,String> data = new HashMap<>();
|
||||
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||
if (iter != null) {
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
data.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCommitData() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null));
|
||||
writer.setCommitData(new HashMap<String,String>() {{
|
||||
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||
put("key", "value");
|
||||
}});
|
||||
assertEquals("value", writer.getCommitData().get("key"));
|
||||
}}.entrySet());
|
||||
assertEquals("value", getLiveCommitData(writer).get("key"));
|
||||
writer.close();
|
||||
|
||||
// validate that it's also visible when opening a new IndexWriter
|
||||
writer = new IndexWriter(dir, newIndexWriterConfig(null)
|
||||
.setOpenMode(OpenMode.APPEND));
|
||||
assertEquals("value", writer.getCommitData().get("key"));
|
||||
assertEquals("value", getLiveCommitData(writer).get("key"));
|
||||
writer.close();
|
||||
|
||||
dir.close();
|
||||
|
@ -2650,9 +2661,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
DirectoryReader r = DirectoryReader.open(w);
|
||||
Map<String,String> m = new HashMap<>();
|
||||
m.put("foo", "bar");
|
||||
w.setCommitData(m);
|
||||
w.setLiveCommitData(m.entrySet());
|
||||
|
||||
// setCommitData with no other changes should count as an NRT change:
|
||||
// setLiveCommitData with no other changes should count as an NRT change:
|
||||
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
||||
assertNotNull(r2);
|
||||
|
||||
|
@ -2669,9 +2680,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||
DirectoryReader r = DirectoryReader.open(w);
|
||||
Map<String,String> m = new HashMap<>();
|
||||
m.put("foo", "bar");
|
||||
w.setCommitData(m);
|
||||
w.setLiveCommitData(m.entrySet());
|
||||
w.commit();
|
||||
// setCommitData and also commit, with no other changes, should count as an NRT change:
|
||||
// setLiveCommitData and also commit, with no other changes, should count as an NRT change:
|
||||
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
||||
assertNotNull(r2);
|
||||
IOUtils.close(r, r2, w, dir);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -421,13 +422,13 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||
// commit to "first"
|
||||
Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put("tag", "first");
|
||||
w.setCommitData(commitData);
|
||||
w.setLiveCommitData(commitData.entrySet());
|
||||
w.commit();
|
||||
|
||||
// commit to "second"
|
||||
w.addDocument(doc);
|
||||
commitData.put("tag", "second");
|
||||
w.setCommitData(commitData);
|
||||
w.setLiveCommitData(commitData.entrySet());
|
||||
w.close();
|
||||
|
||||
// open "first" with IndexWriter
|
||||
|
@ -450,7 +451,7 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||
// commit IndexWriter to "third"
|
||||
w.addDocument(doc);
|
||||
commitData.put("tag", "third");
|
||||
w.setCommitData(commitData);
|
||||
w.setLiveCommitData(commitData.entrySet());
|
||||
w.close();
|
||||
|
||||
// make sure "second" commit is still there
|
||||
|
@ -632,7 +633,7 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||
TestIndexWriter.addDoc(w);
|
||||
Map<String,String> data = new HashMap<>();
|
||||
data.put("label", "test1");
|
||||
w.setCommitData(data);
|
||||
w.setLiveCommitData(data.entrySet());
|
||||
w.close();
|
||||
|
||||
r = DirectoryReader.open(dir);
|
||||
|
@ -663,4 +664,32 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||
r.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
// LUCENE-7335: make sure commit data is late binding
|
||||
public void testCommitDataIsLive() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||
w.addDocument(new Document());
|
||||
|
||||
final Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put("foo", "bar");
|
||||
|
||||
// make sure "foo" / "bar" doesn't take
|
||||
w.setLiveCommitData(commitData.entrySet());
|
||||
|
||||
commitData.clear();
|
||||
commitData.put("boo", "baz");
|
||||
|
||||
// this finally does the commit, and should burn "boo" / "baz"
|
||||
w.close();
|
||||
|
||||
List<IndexCommit> commits = DirectoryReader.listCommits(dir);
|
||||
assertEquals(1, commits.size());
|
||||
|
||||
IndexCommit commit = commits.get(0);
|
||||
Map<String,String> data = commit.getUserData();
|
||||
assertEquals(1, data.size());
|
||||
assertEquals("baz", data.get("boo"));
|
||||
dir.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public class TestTransactionRollback extends LuceneTestCase {
|
|||
.setIndexCommit(last));
|
||||
Map<String,String> data = new HashMap<>();
|
||||
data.put("index", "Rolled back to 1-"+id);
|
||||
w.setCommitData(data);
|
||||
w.setLiveCommitData(data.entrySet());
|
||||
w.close();
|
||||
}
|
||||
|
||||
|
@ -142,7 +142,7 @@ public class TestTransactionRollback extends LuceneTestCase {
|
|||
if (currentRecordId%10 == 0) {
|
||||
Map<String,String> data = new HashMap<>();
|
||||
data.put("index", "records 1-"+currentRecordId);
|
||||
w.setCommitData(data);
|
||||
w.setLiveCommitData(data.entrySet());
|
||||
w.commit();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -205,7 +205,7 @@ public abstract class TaxonomyReader implements Closeable {
|
|||
/**
|
||||
* Retrieve user committed data.
|
||||
*
|
||||
* @see TaxonomyWriter#setCommitData(Map)
|
||||
* @see TaxonomyWriter#setLiveCommitData(Iterable)
|
||||
*/
|
||||
public abstract Map<String, String> getCommitUserData() throws IOException;
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.TwoPhaseCommit;
|
||||
|
||||
/**
|
||||
|
@ -105,19 +106,14 @@ public interface TaxonomyWriter extends Closeable, TwoPhaseCommit {
|
|||
public int getSize();
|
||||
|
||||
/**
|
||||
* Sets the commit user data map. That method is considered a transaction and
|
||||
* will be {@link #commit() committed} even if no other changes were made to
|
||||
* the writer instance.
|
||||
* <p>
|
||||
* <b>NOTE:</b> the map is cloned internally, therefore altering the map's
|
||||
* contents after calling this method has no effect.
|
||||
* Sets the commit user data iterable. See {@link IndexWriter#setLiveCommitData}.
|
||||
*/
|
||||
public void setCommitData(Map<String,String> commitUserData);
|
||||
public void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData);
|
||||
|
||||
/**
|
||||
* Returns the commit user data map that was set on
|
||||
* {@link #setCommitData(Map)}.
|
||||
* Returns the commit user data iterable that was set on
|
||||
* {@link #setLiveCommitData(Iterable)}.
|
||||
*/
|
||||
public Map<String,String> getCommitData();
|
||||
public Iterable<Map.Entry<String,String>> getLiveCommitData();
|
||||
|
||||
}
|
||||
|
|
|
@ -584,31 +584,42 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
|
|||
public synchronized long commit() throws IOException {
|
||||
ensureOpen();
|
||||
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
||||
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
|
||||
|
||||
Map<String,String> data = new HashMap<>();
|
||||
Iterable<Map.Entry<String,String>> iter = indexWriter.getLiveCommitData();
|
||||
if (iter != null) {
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
data.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
String epochStr = data.get(INDEX_EPOCH);
|
||||
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
||||
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
|
||||
indexWriter.setLiveCommitData(combinedCommitData(indexWriter.getLiveCommitData()));
|
||||
}
|
||||
return indexWriter.commit();
|
||||
}
|
||||
|
||||
/** Combine original user data with the taxonomy epoch. */
|
||||
private Map<String,String> combinedCommitData(Map<String,String> commitData) {
|
||||
private Iterable<Map.Entry<String,String>> combinedCommitData(Iterable<Map.Entry<String,String>> commitData) {
|
||||
Map<String,String> m = new HashMap<>();
|
||||
if (commitData != null) {
|
||||
m.putAll(commitData);
|
||||
for(Map.Entry<String,String> ent : commitData) {
|
||||
m.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
m.put(INDEX_EPOCH, Long.toString(indexEpoch, 16));
|
||||
return m;
|
||||
return m.entrySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCommitData(Map<String,String> commitUserData) {
|
||||
indexWriter.setCommitData(combinedCommitData(commitUserData));
|
||||
public void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData) {
|
||||
indexWriter.setLiveCommitData(combinedCommitData(commitUserData));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String,String> getCommitData() {
|
||||
return combinedCommitData(indexWriter.getCommitData());
|
||||
public Iterable<Map.Entry<String,String>> getLiveCommitData() {
|
||||
return combinedCommitData(indexWriter.getLiveCommitData());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -619,9 +630,16 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
|
|||
public synchronized long prepareCommit() throws IOException {
|
||||
ensureOpen();
|
||||
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
||||
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
|
||||
Map<String,String> data = new HashMap<>();
|
||||
Iterable<Map.Entry<String,String>> iter = indexWriter.getLiveCommitData();
|
||||
if (iter != null) {
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
data.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
String epochStr = data.get(INDEX_EPOCH);
|
||||
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
||||
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
|
||||
indexWriter.setLiveCommitData(combinedCommitData(indexWriter.getLiveCommitData()));
|
||||
}
|
||||
return indexWriter.prepareCommit();
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||
taxoWriter.addCategory(new FacetLabel("b"));
|
||||
Map<String, String> userCommitData = new HashMap<>();
|
||||
userCommitData.put("testing", "1 2 3");
|
||||
taxoWriter.setCommitData(userCommitData);
|
||||
taxoWriter.setLiveCommitData(userCommitData.entrySet());
|
||||
taxoWriter.close();
|
||||
DirectoryReader r = DirectoryReader.open(dir);
|
||||
assertEquals("2 categories plus root should have been committed to the underlying directory", 3, r.numDocs());
|
||||
|
@ -109,14 +109,22 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||
// that the taxonomy index has been recreated.
|
||||
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
|
||||
taxoWriter.addCategory(new FacetLabel("c")); // add a category so that commit will happen
|
||||
taxoWriter.setCommitData(new HashMap<String, String>(){{
|
||||
taxoWriter.setLiveCommitData(new HashMap<String, String>(){{
|
||||
put("just", "data");
|
||||
}});
|
||||
}}.entrySet());
|
||||
taxoWriter.commit();
|
||||
|
||||
// verify taxoWriter.getCommitData()
|
||||
Map<String,String> data = new HashMap<>();
|
||||
Iterable<Map.Entry<String,String>> iter = taxoWriter.getLiveCommitData();
|
||||
if (iter != null) {
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
data.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull(DirectoryTaxonomyWriter.INDEX_EPOCH
|
||||
+ " not found in taoxWriter.commitData", taxoWriter.getCommitData().get(DirectoryTaxonomyWriter.INDEX_EPOCH));
|
||||
+ " not found in taoxWriter.commitData", data.get(DirectoryTaxonomyWriter.INDEX_EPOCH));
|
||||
taxoWriter.close();
|
||||
|
||||
r = DirectoryReader.open(dir);
|
||||
|
@ -170,9 +178,9 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||
|
||||
private void touchTaxo(DirectoryTaxonomyWriter taxoWriter, FacetLabel cp) throws IOException {
|
||||
taxoWriter.addCategory(cp);
|
||||
taxoWriter.setCommitData(new HashMap<String, String>(){{
|
||||
taxoWriter.setLiveCommitData(new HashMap<String, String>(){{
|
||||
put("just", "data");
|
||||
}});
|
||||
}}.entrySet());
|
||||
taxoWriter.commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -83,10 +83,16 @@ public abstract class PrimaryNode extends Node {
|
|||
|
||||
message("IWC:\n" + writer.getConfig());
|
||||
message("dir:\n" + writer.getDirectory());
|
||||
message("commitData: " + writer.getCommitData());
|
||||
message("commitData: " + writer.getLiveCommitData());
|
||||
|
||||
// Record our primaryGen in the userData, and set initial version to 0:
|
||||
Map<String,String> commitData = new HashMap<>(writer.getCommitData());
|
||||
Map<String,String> commitData = new HashMap<>();
|
||||
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||
if (iter != null) {
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
commitData.put(ent.getKey(), ent.getValue());
|
||||
}
|
||||
}
|
||||
commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
|
||||
if (commitData.get(VERSION_KEY) == null) {
|
||||
commitData.put(VERSION_KEY, "0");
|
||||
|
@ -94,7 +100,7 @@ public abstract class PrimaryNode extends Node {
|
|||
} else {
|
||||
message("keep current commitData version=" + commitData.get(VERSION_KEY));
|
||||
}
|
||||
writer.setCommitData(commitData, false);
|
||||
writer.setLiveCommitData(commitData.entrySet(), false);
|
||||
|
||||
// We forcefully advance the SIS version to an unused future version. This is necessary if the previous primary crashed and we are
|
||||
// starting up on an "older" index, else versions can be illegally reused but show different results:
|
||||
|
@ -153,10 +159,16 @@ public abstract class PrimaryNode extends Node {
|
|||
}
|
||||
|
||||
public synchronized long getLastCommitVersion() {
|
||||
String s = curInfos.getUserData().get(VERSION_KEY);
|
||||
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||
assert iter != null;
|
||||
for(Map.Entry<String,String> ent : iter) {
|
||||
if (ent.getKey().equals(VERSION_KEY)) {
|
||||
return Long.parseLong(ent.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// In ctor we always install an initial version:
|
||||
assert s != null;
|
||||
return Long.parseLong(s);
|
||||
throw new AssertionError("missing VERSION_KEY");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -167,7 +179,7 @@ public abstract class PrimaryNode extends Node {
|
|||
// on xlog replay we are replaying more ops than necessary.
|
||||
commitData.put(VERSION_KEY, Long.toString(copyState.version));
|
||||
message("top: commit commitData=" + commitData);
|
||||
writer.setCommitData(commitData, false);
|
||||
writer.setLiveCommitData(commitData.entrySet(), false);
|
||||
writer.commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -172,9 +172,9 @@ public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
|
|||
|
||||
private Revision createRevision(final int id) throws IOException {
|
||||
publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
|
||||
publishIndexWriter.setCommitData(new HashMap<String, String>() {{
|
||||
publishIndexWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||
put(VERSION_ID, Integer.toString(id, 16));
|
||||
}});
|
||||
}}.entrySet());
|
||||
publishIndexWriter.commit();
|
||||
publishTaxoWriter.commit();
|
||||
return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
|
||||
|
|
|
@ -122,9 +122,9 @@ public class IndexReplicationClientTest extends ReplicatorTestCase {
|
|||
|
||||
private Revision createRevision(final int id) throws IOException {
|
||||
publishWriter.addDocument(new Document());
|
||||
publishWriter.setCommitData(new HashMap<String, String>() {{
|
||||
publishWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||
put(VERSION_ID, Integer.toString(id, 16));
|
||||
}});
|
||||
}}.entrySet());
|
||||
publishWriter.commit();
|
||||
return new IndexRevision(publishWriter);
|
||||
}
|
||||
|
|
|
@ -65,9 +65,9 @@ public class LocalReplicatorTest extends ReplicatorTestCase {
|
|||
|
||||
private Revision createRevision(final int id) throws IOException {
|
||||
sourceWriter.addDocument(new Document());
|
||||
sourceWriter.setCommitData(new HashMap<String, String>() {{
|
||||
sourceWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||
put(VERSION_ID, Integer.toString(id, 16));
|
||||
}});
|
||||
}}.entrySet());
|
||||
sourceWriter.commit();
|
||||
return new IndexRevision(sourceWriter);
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ public class HttpReplicatorTest extends ReplicatorTestCase {
|
|||
private void publishRevision(int id) throws IOException {
|
||||
Document doc = new Document();
|
||||
writer.addDocument(doc);
|
||||
writer.setCommitData(Collections.singletonMap("ID", Integer.toString(id, 16)));
|
||||
writer.setLiveCommitData(Collections.singletonMap("ID", Integer.toString(id, 16)).entrySet());
|
||||
writer.commit();
|
||||
serverReplicator.publish(new IndexRevision(writer));
|
||||
}
|
||||
|
|
|
@ -523,7 +523,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
final Map<String,String> commitData = new HashMap<>();
|
||||
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
|
||||
String.valueOf(System.currentTimeMillis()));
|
||||
iw.setCommitData(commitData);
|
||||
iw.setLiveCommitData(commitData.entrySet());
|
||||
}
|
||||
|
||||
public void prepareCommit(CommitUpdateCommand cmd) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue