mirror of
https://github.com/apache/lucene.git
synced 2025-02-28 13:29:26 +00:00
LUCENE-7335: IW's commit data is now late binding
This commit is contained in:
parent
a4455a4b14
commit
55fc01bc45
@ -23,6 +23,10 @@ New Features
|
|||||||
long "sequence number" indicating the effective equivalent
|
long "sequence number" indicating the effective equivalent
|
||||||
single-threaded execution order (Mike McCandless)
|
single-threaded execution order (Mike McCandless)
|
||||||
|
|
||||||
|
* LUCENE-7335: IndexWriter's commit data is now late binding,
|
||||||
|
recording key/values from a provided iterable based on when the
|
||||||
|
commit actually takes place (Mike McCandless)
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
|
|
||||||
* LUCENE-6662: Fixed potential resource leaks. (Rishabh Patel via Adrien Grand)
|
* LUCENE-6662: Fixed potential resource leaks. (Rishabh Patel via Adrien Grand)
|
||||||
|
@ -51,7 +51,7 @@ public class CommitIndexTask extends PerfTask {
|
|||||||
IndexWriter iw = getRunData().getIndexWriter();
|
IndexWriter iw = getRunData().getIndexWriter();
|
||||||
if (iw != null) {
|
if (iw != null) {
|
||||||
if (commitUserData != null) {
|
if (commitUserData != null) {
|
||||||
iw.setCommitData(commitUserData);
|
iw.setLiveCommitData(commitUserData.entrySet());
|
||||||
}
|
}
|
||||||
iw.commit();
|
iw.commit();
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,7 @@ public abstract class IndexCommit implements Comparable<IndexCommit> {
|
|||||||
public abstract long getGeneration();
|
public abstract long getGeneration();
|
||||||
|
|
||||||
/** Returns userData, previously passed to {@link
|
/** Returns userData, previously passed to {@link
|
||||||
* IndexWriter#setCommitData(Map)} for this commit. Map is
|
* IndexWriter#setLiveCommitData(Iterable)} for this commit. Map is
|
||||||
* {@code String -> String}. */
|
* {@code String -> String}. */
|
||||||
public abstract Map<String,String> getUserData() throws IOException;
|
public abstract Map<String,String> getUserData() throws IOException;
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ public final class IndexUpgrader {
|
|||||||
infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST);
|
infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST);
|
||||||
infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata...");
|
infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata...");
|
||||||
}
|
}
|
||||||
w.setCommitData(w.getCommitData()); // fake change to enforce a commit (e.g. if index has no segments)
|
w.setLiveCommitData(w.getLiveCommitData()); // fake change to enforce a commit (e.g. if index has no segments)
|
||||||
assert w.hasUncommittedChanges();
|
assert w.hasUncommittedChanges();
|
||||||
w.commit();
|
w.commit();
|
||||||
if (infoStream.isEnabled(LOG_PREFIX)) {
|
if (infoStream.isEnabled(LOG_PREFIX)) {
|
||||||
|
@ -295,6 +295,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
private volatile boolean closing;
|
private volatile boolean closing;
|
||||||
|
|
||||||
|
private Iterable<Map.Entry<String,String>> commitUserData;
|
||||||
|
|
||||||
// Holds all SegmentInfo instances currently involved in
|
// Holds all SegmentInfo instances currently involved in
|
||||||
// merges
|
// merges
|
||||||
private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
||||||
@ -947,6 +949,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||||||
rollbackSegments = segmentInfos.createBackupSegmentInfos();
|
rollbackSegments = segmentInfos.createBackupSegmentInfos();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
commitUserData = new HashMap<String,String>(segmentInfos.getUserData()).entrySet();
|
||||||
|
|
||||||
pendingNumDocs.set(segmentInfos.totalMaxDoc());
|
pendingNumDocs.set(segmentInfos.totalMaxDoc());
|
||||||
|
|
||||||
// start with previous field numbers, but new FieldInfos
|
// start with previous field numbers, but new FieldInfos
|
||||||
@ -2997,6 +3001,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||||||
segmentInfos.changed();
|
segmentInfos.changed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (commitUserData != null) {
|
||||||
|
Map<String,String> userData = new HashMap<>();
|
||||||
|
for(Map.Entry<String,String> ent : commitUserData) {
|
||||||
|
userData.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
segmentInfos.setUserData(userData, false);
|
||||||
|
}
|
||||||
|
|
||||||
// Must clone the segmentInfos while we still
|
// Must clone the segmentInfos while we still
|
||||||
// hold fullFlushLock and while sync'd so that
|
// hold fullFlushLock and while sync'd so that
|
||||||
// no partial changes (eg a delete w/o
|
// no partial changes (eg a delete w/o
|
||||||
@ -3011,7 +3023,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||||||
// we are trying to sync all referenced files, a
|
// we are trying to sync all referenced files, a
|
||||||
// merge completes which would otherwise have
|
// merge completes which would otherwise have
|
||||||
// removed the files we are now syncing.
|
// removed the files we are now syncing.
|
||||||
filesToCommit = toCommit.files(false);
|
filesToCommit = toCommit.files(false);
|
||||||
deleter.incRef(filesToCommit);
|
deleter.incRef(filesToCommit);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
@ -3059,36 +3071,38 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the commit user data map. That method is considered a transaction by
|
* Sets the iterator to provide the commit user data map at commit time. Calling this method
|
||||||
* {@link IndexWriter} and will be {@link #commit() committed} even if no other
|
* is considered a committable change and will be {@link #commit() committed} even if
|
||||||
* changes were made to the writer instance. Note that you must call this method
|
* there are no other changes this writer. Note that you must call this method
|
||||||
* before {@link #prepareCommit()}, or otherwise it won't be included in the
|
* before {@link #prepareCommit()}. Otherwise it won't be included in the
|
||||||
* follow-on {@link #commit()}.
|
* follow-on {@link #commit()}.
|
||||||
* <p>
|
* <p>
|
||||||
* <b>NOTE:</b> the map is cloned internally, therefore altering the map's
|
* <b>NOTE:</b> the iterator is late-binding: it is only visited once all documents for the
|
||||||
* contents after calling this method has no effect.
|
* commit have been written to their segments, before the next segments_N file is written
|
||||||
*/
|
*/
|
||||||
public final synchronized void setCommitData(Map<String,String> commitUserData) {
|
public final synchronized void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData) {
|
||||||
setCommitData(commitUserData, true);
|
setLiveCommitData(commitUserData, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the commit user data map, controlling whether to advance the {@link SegmentInfos#getVersion}.
|
* Sets the commit user data iterator, controlling whether to advance the {@link SegmentInfos#getVersion}.
|
||||||
*
|
*
|
||||||
* @see #setCommitData(Map)
|
* @see #setLiveCommitData(Iterable)
|
||||||
*
|
*
|
||||||
* @lucene.internal */
|
* @lucene.internal */
|
||||||
public final synchronized void setCommitData(Map<String,String> commitUserData, boolean doIncrementVersion) {
|
public final synchronized void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData, boolean doIncrementVersion) {
|
||||||
segmentInfos.setUserData(new HashMap<>(commitUserData), doIncrementVersion);
|
this.commitUserData = commitUserData;
|
||||||
|
if (doIncrementVersion) {
|
||||||
|
segmentInfos.changed();
|
||||||
|
}
|
||||||
changeCount.incrementAndGet();
|
changeCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the commit user data map that was last committed, or the one that
|
* Returns the commit user data iterable previously set with {@link #setLiveCommitData(Iterable)}, or null if nothing has been set yet.
|
||||||
* was set on {@link #setCommitData(Map)}.
|
|
||||||
*/
|
*/
|
||||||
public final synchronized Map<String,String> getCommitData() {
|
public final synchronized Iterable<Map.Entry<String,String>> getLiveCommitData() {
|
||||||
return segmentInfos.getUserData();
|
return commitUserData;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used only by commit and prepareCommit, below; lock
|
// Used only by commit and prepareCommit, below; lock
|
||||||
|
@ -17,6 +17,20 @@
|
|||||||
package org.apache.lucene.index;
|
package org.apache.lucene.index;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.lucene.codecs.Codec;
|
import org.apache.lucene.codecs.Codec;
|
||||||
import org.apache.lucene.codecs.CodecUtil;
|
import org.apache.lucene.codecs.CodecUtil;
|
||||||
import org.apache.lucene.codecs.DocValuesFormat;
|
import org.apache.lucene.codecs.DocValuesFormat;
|
||||||
@ -32,20 +46,6 @@ import org.apache.lucene.util.IOUtils;
|
|||||||
import org.apache.lucene.util.StringHelper;
|
import org.apache.lucene.util.StringHelper;
|
||||||
import org.apache.lucene.util.Version;
|
import org.apache.lucene.util.Version;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A collection of segmentInfo objects with methods for operating on those
|
* A collection of segmentInfo objects with methods for operating on those
|
||||||
* segments in relation to the file system.
|
* segments in relation to the file system.
|
||||||
@ -103,7 +103,7 @@ import java.util.Set;
|
|||||||
* <li>SegID is the identifier of the Codec that encoded this segment. </li>
|
* <li>SegID is the identifier of the Codec that encoded this segment. </li>
|
||||||
* <li>CommitUserData stores an optional user-supplied opaque
|
* <li>CommitUserData stores an optional user-supplied opaque
|
||||||
* Map<String,String> that was passed to
|
* Map<String,String> that was passed to
|
||||||
* {@link IndexWriter#setCommitData(java.util.Map)}.</li>
|
* {@link IndexWriter#setLiveCommitData(Iterable)}.</li>
|
||||||
* <li>FieldInfosGen is the generation count of the fieldInfos file. If this is
|
* <li>FieldInfosGen is the generation count of the fieldInfos file. If this is
|
||||||
* -1, there are no updates to the fieldInfos in that segment. Anything above
|
* -1, there are no updates to the fieldInfos in that segment. Anything above
|
||||||
* zero means there are updates to fieldInfos stored by {@link FieldInfosFormat}
|
* zero means there are updates to fieldInfos stored by {@link FieldInfosFormat}
|
||||||
@ -443,7 +443,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
|
|||||||
String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
|
String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS,
|
||||||
"",
|
"",
|
||||||
nextGeneration);
|
nextGeneration);
|
||||||
|
|
||||||
// Always advance the generation on write:
|
// Always advance the generation on write:
|
||||||
generation = nextGeneration;
|
generation = nextGeneration;
|
||||||
|
|
||||||
|
@ -229,7 +229,7 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||||||
ExpirationTimeDeletionPolicy policy = (ExpirationTimeDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
|
ExpirationTimeDeletionPolicy policy = (ExpirationTimeDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
|
||||||
Map<String,String> commitData = new HashMap<>();
|
Map<String,String> commitData = new HashMap<>();
|
||||||
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
||||||
writer.setCommitData(commitData);
|
writer.setLiveCommitData(commitData.entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
@ -251,7 +251,7 @@ public class TestDeletionPolicy extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
commitData = new HashMap<>();
|
commitData = new HashMap<>();
|
||||||
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
commitData.put("commitTime", String.valueOf(System.currentTimeMillis()));
|
||||||
writer.setCommitData(commitData);
|
writer.setLiveCommitData(commitData.entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
|
@ -557,14 +557,14 @@ public class TestDirectoryReaderReopen extends LuceneTestCase {
|
|||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
Map<String,String> data = new HashMap<>();
|
Map<String,String> data = new HashMap<>();
|
||||||
data.put("index", i+"");
|
data.put("index", i+"");
|
||||||
writer.setCommitData(data);
|
writer.setLiveCommitData(data.entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
}
|
}
|
||||||
for(int i=0;i<4;i++) {
|
for(int i=0;i<4;i++) {
|
||||||
writer.deleteDocuments(new Term("id", ""+i));
|
writer.deleteDocuments(new Term("id", ""+i));
|
||||||
Map<String,String> data = new HashMap<>();
|
Map<String,String> data = new HashMap<>();
|
||||||
data.put("index", (4+i)+"");
|
data.put("index", (4+i)+"");
|
||||||
writer.setCommitData(data);
|
writer.setLiveCommitData(data.entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
}
|
}
|
||||||
writer.close();
|
writer.close();
|
||||||
|
@ -1892,9 +1892,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
writer.commit(); // first commit to complete IW create transaction.
|
writer.commit(); // first commit to complete IW create transaction.
|
||||||
|
|
||||||
// this should store the commit data, even though no other changes were made
|
// this should store the commit data, even though no other changes were made
|
||||||
writer.setCommitData(new HashMap<String,String>() {{
|
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||||
put("key", "value");
|
put("key", "value");
|
||||||
}});
|
}}.entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
|
|
||||||
DirectoryReader r = DirectoryReader.open(dir);
|
DirectoryReader r = DirectoryReader.open(dir);
|
||||||
@ -1902,13 +1902,13 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
r.close();
|
r.close();
|
||||||
|
|
||||||
// now check setCommitData and prepareCommit/commit sequence
|
// now check setCommitData and prepareCommit/commit sequence
|
||||||
writer.setCommitData(new HashMap<String,String>() {{
|
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||||
put("key", "value1");
|
put("key", "value1");
|
||||||
}});
|
}}.entrySet());
|
||||||
writer.prepareCommit();
|
writer.prepareCommit();
|
||||||
writer.setCommitData(new HashMap<String,String>() {{
|
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||||
put("key", "value2");
|
put("key", "value2");
|
||||||
}});
|
}}.entrySet());
|
||||||
writer.commit(); // should commit the first commitData only, per protocol
|
writer.commit(); // should commit the first commitData only, per protocol
|
||||||
|
|
||||||
r = DirectoryReader.open(dir);
|
r = DirectoryReader.open(dir);
|
||||||
@ -1926,21 +1926,32 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
writer.close();
|
writer.close();
|
||||||
dir.close();
|
dir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String,String> getLiveCommitData(IndexWriter writer) {
|
||||||
|
Map<String,String> data = new HashMap<>();
|
||||||
|
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||||
|
if (iter != null) {
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
data.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetCommitData() throws Exception {
|
public void testGetCommitData() throws Exception {
|
||||||
Directory dir = newDirectory();
|
Directory dir = newDirectory();
|
||||||
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null));
|
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null));
|
||||||
writer.setCommitData(new HashMap<String,String>() {{
|
writer.setLiveCommitData(new HashMap<String,String>() {{
|
||||||
put("key", "value");
|
put("key", "value");
|
||||||
}});
|
}}.entrySet());
|
||||||
assertEquals("value", writer.getCommitData().get("key"));
|
assertEquals("value", getLiveCommitData(writer).get("key"));
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
// validate that it's also visible when opening a new IndexWriter
|
// validate that it's also visible when opening a new IndexWriter
|
||||||
writer = new IndexWriter(dir, newIndexWriterConfig(null)
|
writer = new IndexWriter(dir, newIndexWriterConfig(null)
|
||||||
.setOpenMode(OpenMode.APPEND));
|
.setOpenMode(OpenMode.APPEND));
|
||||||
assertEquals("value", writer.getCommitData().get("key"));
|
assertEquals("value", getLiveCommitData(writer).get("key"));
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
dir.close();
|
dir.close();
|
||||||
@ -2650,9 +2661,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
DirectoryReader r = DirectoryReader.open(w);
|
DirectoryReader r = DirectoryReader.open(w);
|
||||||
Map<String,String> m = new HashMap<>();
|
Map<String,String> m = new HashMap<>();
|
||||||
m.put("foo", "bar");
|
m.put("foo", "bar");
|
||||||
w.setCommitData(m);
|
w.setLiveCommitData(m.entrySet());
|
||||||
|
|
||||||
// setCommitData with no other changes should count as an NRT change:
|
// setLiveCommitData with no other changes should count as an NRT change:
|
||||||
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
||||||
assertNotNull(r2);
|
assertNotNull(r2);
|
||||||
|
|
||||||
@ -2669,9 +2680,9 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
DirectoryReader r = DirectoryReader.open(w);
|
DirectoryReader r = DirectoryReader.open(w);
|
||||||
Map<String,String> m = new HashMap<>();
|
Map<String,String> m = new HashMap<>();
|
||||||
m.put("foo", "bar");
|
m.put("foo", "bar");
|
||||||
w.setCommitData(m);
|
w.setLiveCommitData(m.entrySet());
|
||||||
w.commit();
|
w.commit();
|
||||||
// setCommitData and also commit, with no other changes, should count as an NRT change:
|
// setLiveCommitData and also commit, with no other changes, should count as an NRT change:
|
||||||
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
||||||
assertNotNull(r2);
|
assertNotNull(r2);
|
||||||
IOUtils.close(r, r2, w, dir);
|
IOUtils.close(r, r2, w, dir);
|
||||||
|
@ -19,6 +19,7 @@ package org.apache.lucene.index;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@ -421,13 +422,13 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||||||
// commit to "first"
|
// commit to "first"
|
||||||
Map<String,String> commitData = new HashMap<>();
|
Map<String,String> commitData = new HashMap<>();
|
||||||
commitData.put("tag", "first");
|
commitData.put("tag", "first");
|
||||||
w.setCommitData(commitData);
|
w.setLiveCommitData(commitData.entrySet());
|
||||||
w.commit();
|
w.commit();
|
||||||
|
|
||||||
// commit to "second"
|
// commit to "second"
|
||||||
w.addDocument(doc);
|
w.addDocument(doc);
|
||||||
commitData.put("tag", "second");
|
commitData.put("tag", "second");
|
||||||
w.setCommitData(commitData);
|
w.setLiveCommitData(commitData.entrySet());
|
||||||
w.close();
|
w.close();
|
||||||
|
|
||||||
// open "first" with IndexWriter
|
// open "first" with IndexWriter
|
||||||
@ -450,7 +451,7 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||||||
// commit IndexWriter to "third"
|
// commit IndexWriter to "third"
|
||||||
w.addDocument(doc);
|
w.addDocument(doc);
|
||||||
commitData.put("tag", "third");
|
commitData.put("tag", "third");
|
||||||
w.setCommitData(commitData);
|
w.setLiveCommitData(commitData.entrySet());
|
||||||
w.close();
|
w.close();
|
||||||
|
|
||||||
// make sure "second" commit is still there
|
// make sure "second" commit is still there
|
||||||
@ -632,7 +633,7 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||||||
TestIndexWriter.addDoc(w);
|
TestIndexWriter.addDoc(w);
|
||||||
Map<String,String> data = new HashMap<>();
|
Map<String,String> data = new HashMap<>();
|
||||||
data.put("label", "test1");
|
data.put("label", "test1");
|
||||||
w.setCommitData(data);
|
w.setLiveCommitData(data.entrySet());
|
||||||
w.close();
|
w.close();
|
||||||
|
|
||||||
r = DirectoryReader.open(dir);
|
r = DirectoryReader.open(dir);
|
||||||
@ -663,4 +664,32 @@ public class TestIndexWriterCommit extends LuceneTestCase {
|
|||||||
r.close();
|
r.close();
|
||||||
dir.close();
|
dir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LUCENE-7335: make sure commit data is late binding
|
||||||
|
public void testCommitDataIsLive() throws Exception {
|
||||||
|
Directory dir = newDirectory();
|
||||||
|
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
|
||||||
|
w.addDocument(new Document());
|
||||||
|
|
||||||
|
final Map<String,String> commitData = new HashMap<>();
|
||||||
|
commitData.put("foo", "bar");
|
||||||
|
|
||||||
|
// make sure "foo" / "bar" doesn't take
|
||||||
|
w.setLiveCommitData(commitData.entrySet());
|
||||||
|
|
||||||
|
commitData.clear();
|
||||||
|
commitData.put("boo", "baz");
|
||||||
|
|
||||||
|
// this finally does the commit, and should burn "boo" / "baz"
|
||||||
|
w.close();
|
||||||
|
|
||||||
|
List<IndexCommit> commits = DirectoryReader.listCommits(dir);
|
||||||
|
assertEquals(1, commits.size());
|
||||||
|
|
||||||
|
IndexCommit commit = commits.get(0);
|
||||||
|
Map<String,String> data = commit.getUserData();
|
||||||
|
assertEquals(1, data.size());
|
||||||
|
assertEquals("baz", data.get("boo"));
|
||||||
|
dir.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ public class TestTransactionRollback extends LuceneTestCase {
|
|||||||
.setIndexCommit(last));
|
.setIndexCommit(last));
|
||||||
Map<String,String> data = new HashMap<>();
|
Map<String,String> data = new HashMap<>();
|
||||||
data.put("index", "Rolled back to 1-"+id);
|
data.put("index", "Rolled back to 1-"+id);
|
||||||
w.setCommitData(data);
|
w.setLiveCommitData(data.entrySet());
|
||||||
w.close();
|
w.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,7 +142,7 @@ public class TestTransactionRollback extends LuceneTestCase {
|
|||||||
if (currentRecordId%10 == 0) {
|
if (currentRecordId%10 == 0) {
|
||||||
Map<String,String> data = new HashMap<>();
|
Map<String,String> data = new HashMap<>();
|
||||||
data.put("index", "records 1-"+currentRecordId);
|
data.put("index", "records 1-"+currentRecordId);
|
||||||
w.setCommitData(data);
|
w.setLiveCommitData(data.entrySet());
|
||||||
w.commit();
|
w.commit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,7 +205,7 @@ public abstract class TaxonomyReader implements Closeable {
|
|||||||
/**
|
/**
|
||||||
* Retrieve user committed data.
|
* Retrieve user committed data.
|
||||||
*
|
*
|
||||||
* @see TaxonomyWriter#setCommitData(Map)
|
* @see TaxonomyWriter#setLiveCommitData(Iterable)
|
||||||
*/
|
*/
|
||||||
public abstract Map<String, String> getCommitUserData() throws IOException;
|
public abstract Map<String, String> getCommitUserData() throws IOException;
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import java.io.Closeable;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.TwoPhaseCommit;
|
import org.apache.lucene.index.TwoPhaseCommit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -105,19 +106,14 @@ public interface TaxonomyWriter extends Closeable, TwoPhaseCommit {
|
|||||||
public int getSize();
|
public int getSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the commit user data map. That method is considered a transaction and
|
* Sets the commit user data iterable. See {@link IndexWriter#setLiveCommitData}.
|
||||||
* will be {@link #commit() committed} even if no other changes were made to
|
|
||||||
* the writer instance.
|
|
||||||
* <p>
|
|
||||||
* <b>NOTE:</b> the map is cloned internally, therefore altering the map's
|
|
||||||
* contents after calling this method has no effect.
|
|
||||||
*/
|
*/
|
||||||
public void setCommitData(Map<String,String> commitUserData);
|
public void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the commit user data map that was set on
|
* Returns the commit user data iterable that was set on
|
||||||
* {@link #setCommitData(Map)}.
|
* {@link #setLiveCommitData(Iterable)}.
|
||||||
*/
|
*/
|
||||||
public Map<String,String> getCommitData();
|
public Iterable<Map.Entry<String,String>> getLiveCommitData();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -584,31 +584,42 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
|
|||||||
public synchronized long commit() throws IOException {
|
public synchronized long commit() throws IOException {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
||||||
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
|
|
||||||
|
Map<String,String> data = new HashMap<>();
|
||||||
|
Iterable<Map.Entry<String,String>> iter = indexWriter.getLiveCommitData();
|
||||||
|
if (iter != null) {
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
data.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String epochStr = data.get(INDEX_EPOCH);
|
||||||
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
||||||
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
|
indexWriter.setLiveCommitData(combinedCommitData(indexWriter.getLiveCommitData()));
|
||||||
}
|
}
|
||||||
return indexWriter.commit();
|
return indexWriter.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Combine original user data with the taxonomy epoch. */
|
/** Combine original user data with the taxonomy epoch. */
|
||||||
private Map<String,String> combinedCommitData(Map<String,String> commitData) {
|
private Iterable<Map.Entry<String,String>> combinedCommitData(Iterable<Map.Entry<String,String>> commitData) {
|
||||||
Map<String,String> m = new HashMap<>();
|
Map<String,String> m = new HashMap<>();
|
||||||
if (commitData != null) {
|
if (commitData != null) {
|
||||||
m.putAll(commitData);
|
for(Map.Entry<String,String> ent : commitData) {
|
||||||
|
m.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
m.put(INDEX_EPOCH, Long.toString(indexEpoch, 16));
|
m.put(INDEX_EPOCH, Long.toString(indexEpoch, 16));
|
||||||
return m;
|
return m.entrySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setCommitData(Map<String,String> commitUserData) {
|
public void setLiveCommitData(Iterable<Map.Entry<String,String>> commitUserData) {
|
||||||
indexWriter.setCommitData(combinedCommitData(commitUserData));
|
indexWriter.setLiveCommitData(combinedCommitData(commitUserData));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String,String> getCommitData() {
|
public Iterable<Map.Entry<String,String>> getLiveCommitData() {
|
||||||
return combinedCommitData(indexWriter.getCommitData());
|
return combinedCommitData(indexWriter.getLiveCommitData());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -619,9 +630,16 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
|
|||||||
public synchronized long prepareCommit() throws IOException {
|
public synchronized long prepareCommit() throws IOException {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
// LUCENE-4972: if we always call setCommitData, we create empty commits
|
||||||
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
|
Map<String,String> data = new HashMap<>();
|
||||||
|
Iterable<Map.Entry<String,String>> iter = indexWriter.getLiveCommitData();
|
||||||
|
if (iter != null) {
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
data.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String epochStr = data.get(INDEX_EPOCH);
|
||||||
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
|
||||||
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
|
indexWriter.setLiveCommitData(combinedCommitData(indexWriter.getLiveCommitData()));
|
||||||
}
|
}
|
||||||
return indexWriter.prepareCommit();
|
return indexWriter.prepareCommit();
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||||||
taxoWriter.addCategory(new FacetLabel("b"));
|
taxoWriter.addCategory(new FacetLabel("b"));
|
||||||
Map<String, String> userCommitData = new HashMap<>();
|
Map<String, String> userCommitData = new HashMap<>();
|
||||||
userCommitData.put("testing", "1 2 3");
|
userCommitData.put("testing", "1 2 3");
|
||||||
taxoWriter.setCommitData(userCommitData);
|
taxoWriter.setLiveCommitData(userCommitData.entrySet());
|
||||||
taxoWriter.close();
|
taxoWriter.close();
|
||||||
DirectoryReader r = DirectoryReader.open(dir);
|
DirectoryReader r = DirectoryReader.open(dir);
|
||||||
assertEquals("2 categories plus root should have been committed to the underlying directory", 3, r.numDocs());
|
assertEquals("2 categories plus root should have been committed to the underlying directory", 3, r.numDocs());
|
||||||
@ -109,14 +109,22 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||||||
// that the taxonomy index has been recreated.
|
// that the taxonomy index has been recreated.
|
||||||
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
|
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
|
||||||
taxoWriter.addCategory(new FacetLabel("c")); // add a category so that commit will happen
|
taxoWriter.addCategory(new FacetLabel("c")); // add a category so that commit will happen
|
||||||
taxoWriter.setCommitData(new HashMap<String, String>(){{
|
taxoWriter.setLiveCommitData(new HashMap<String, String>(){{
|
||||||
put("just", "data");
|
put("just", "data");
|
||||||
}});
|
}}.entrySet());
|
||||||
taxoWriter.commit();
|
taxoWriter.commit();
|
||||||
|
|
||||||
// verify taxoWriter.getCommitData()
|
// verify taxoWriter.getCommitData()
|
||||||
|
Map<String,String> data = new HashMap<>();
|
||||||
|
Iterable<Map.Entry<String,String>> iter = taxoWriter.getLiveCommitData();
|
||||||
|
if (iter != null) {
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
data.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertNotNull(DirectoryTaxonomyWriter.INDEX_EPOCH
|
assertNotNull(DirectoryTaxonomyWriter.INDEX_EPOCH
|
||||||
+ " not found in taoxWriter.commitData", taxoWriter.getCommitData().get(DirectoryTaxonomyWriter.INDEX_EPOCH));
|
+ " not found in taoxWriter.commitData", data.get(DirectoryTaxonomyWriter.INDEX_EPOCH));
|
||||||
taxoWriter.close();
|
taxoWriter.close();
|
||||||
|
|
||||||
r = DirectoryReader.open(dir);
|
r = DirectoryReader.open(dir);
|
||||||
@ -170,9 +178,9 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
|
|||||||
|
|
||||||
private void touchTaxo(DirectoryTaxonomyWriter taxoWriter, FacetLabel cp) throws IOException {
|
private void touchTaxo(DirectoryTaxonomyWriter taxoWriter, FacetLabel cp) throws IOException {
|
||||||
taxoWriter.addCategory(cp);
|
taxoWriter.addCategory(cp);
|
||||||
taxoWriter.setCommitData(new HashMap<String, String>(){{
|
taxoWriter.setLiveCommitData(new HashMap<String, String>(){{
|
||||||
put("just", "data");
|
put("just", "data");
|
||||||
}});
|
}}.entrySet());
|
||||||
taxoWriter.commit();
|
taxoWriter.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,10 +83,16 @@ public abstract class PrimaryNode extends Node {
|
|||||||
|
|
||||||
message("IWC:\n" + writer.getConfig());
|
message("IWC:\n" + writer.getConfig());
|
||||||
message("dir:\n" + writer.getDirectory());
|
message("dir:\n" + writer.getDirectory());
|
||||||
message("commitData: " + writer.getCommitData());
|
message("commitData: " + writer.getLiveCommitData());
|
||||||
|
|
||||||
// Record our primaryGen in the userData, and set initial version to 0:
|
// Record our primaryGen in the userData, and set initial version to 0:
|
||||||
Map<String,String> commitData = new HashMap<>(writer.getCommitData());
|
Map<String,String> commitData = new HashMap<>();
|
||||||
|
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||||
|
if (iter != null) {
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
commitData.put(ent.getKey(), ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
|
commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
|
||||||
if (commitData.get(VERSION_KEY) == null) {
|
if (commitData.get(VERSION_KEY) == null) {
|
||||||
commitData.put(VERSION_KEY, "0");
|
commitData.put(VERSION_KEY, "0");
|
||||||
@ -94,7 +100,7 @@ public abstract class PrimaryNode extends Node {
|
|||||||
} else {
|
} else {
|
||||||
message("keep current commitData version=" + commitData.get(VERSION_KEY));
|
message("keep current commitData version=" + commitData.get(VERSION_KEY));
|
||||||
}
|
}
|
||||||
writer.setCommitData(commitData, false);
|
writer.setLiveCommitData(commitData.entrySet(), false);
|
||||||
|
|
||||||
// We forcefully advance the SIS version to an unused future version. This is necessary if the previous primary crashed and we are
|
// We forcefully advance the SIS version to an unused future version. This is necessary if the previous primary crashed and we are
|
||||||
// starting up on an "older" index, else versions can be illegally reused but show different results:
|
// starting up on an "older" index, else versions can be illegally reused but show different results:
|
||||||
@ -153,10 +159,16 @@ public abstract class PrimaryNode extends Node {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized long getLastCommitVersion() {
|
public synchronized long getLastCommitVersion() {
|
||||||
String s = curInfos.getUserData().get(VERSION_KEY);
|
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
|
||||||
|
assert iter != null;
|
||||||
|
for(Map.Entry<String,String> ent : iter) {
|
||||||
|
if (ent.getKey().equals(VERSION_KEY)) {
|
||||||
|
return Long.parseLong(ent.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// In ctor we always install an initial version:
|
// In ctor we always install an initial version:
|
||||||
assert s != null;
|
throw new AssertionError("missing VERSION_KEY");
|
||||||
return Long.parseLong(s);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -167,7 +179,7 @@ public abstract class PrimaryNode extends Node {
|
|||||||
// on xlog replay we are replaying more ops than necessary.
|
// on xlog replay we are replaying more ops than necessary.
|
||||||
commitData.put(VERSION_KEY, Long.toString(copyState.version));
|
commitData.put(VERSION_KEY, Long.toString(copyState.version));
|
||||||
message("top: commit commitData=" + commitData);
|
message("top: commit commitData=" + commitData);
|
||||||
writer.setCommitData(commitData, false);
|
writer.setLiveCommitData(commitData.entrySet(), false);
|
||||||
writer.commit();
|
writer.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,9 +172,9 @@ public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
|
|||||||
|
|
||||||
private Revision createRevision(final int id) throws IOException {
|
private Revision createRevision(final int id) throws IOException {
|
||||||
publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
|
publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
|
||||||
publishIndexWriter.setCommitData(new HashMap<String, String>() {{
|
publishIndexWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||||
put(VERSION_ID, Integer.toString(id, 16));
|
put(VERSION_ID, Integer.toString(id, 16));
|
||||||
}});
|
}}.entrySet());
|
||||||
publishIndexWriter.commit();
|
publishIndexWriter.commit();
|
||||||
publishTaxoWriter.commit();
|
publishTaxoWriter.commit();
|
||||||
return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
|
return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
|
||||||
|
@ -122,9 +122,9 @@ public class IndexReplicationClientTest extends ReplicatorTestCase {
|
|||||||
|
|
||||||
private Revision createRevision(final int id) throws IOException {
|
private Revision createRevision(final int id) throws IOException {
|
||||||
publishWriter.addDocument(new Document());
|
publishWriter.addDocument(new Document());
|
||||||
publishWriter.setCommitData(new HashMap<String, String>() {{
|
publishWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||||
put(VERSION_ID, Integer.toString(id, 16));
|
put(VERSION_ID, Integer.toString(id, 16));
|
||||||
}});
|
}}.entrySet());
|
||||||
publishWriter.commit();
|
publishWriter.commit();
|
||||||
return new IndexRevision(publishWriter);
|
return new IndexRevision(publishWriter);
|
||||||
}
|
}
|
||||||
|
@ -65,9 +65,9 @@ public class LocalReplicatorTest extends ReplicatorTestCase {
|
|||||||
|
|
||||||
private Revision createRevision(final int id) throws IOException {
|
private Revision createRevision(final int id) throws IOException {
|
||||||
sourceWriter.addDocument(new Document());
|
sourceWriter.addDocument(new Document());
|
||||||
sourceWriter.setCommitData(new HashMap<String, String>() {{
|
sourceWriter.setLiveCommitData(new HashMap<String, String>() {{
|
||||||
put(VERSION_ID, Integer.toString(id, 16));
|
put(VERSION_ID, Integer.toString(id, 16));
|
||||||
}});
|
}}.entrySet());
|
||||||
sourceWriter.commit();
|
sourceWriter.commit();
|
||||||
return new IndexRevision(sourceWriter);
|
return new IndexRevision(sourceWriter);
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ public class HttpReplicatorTest extends ReplicatorTestCase {
|
|||||||
private void publishRevision(int id) throws IOException {
|
private void publishRevision(int id) throws IOException {
|
||||||
Document doc = new Document();
|
Document doc = new Document();
|
||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
writer.setCommitData(Collections.singletonMap("ID", Integer.toString(id, 16)));
|
writer.setLiveCommitData(Collections.singletonMap("ID", Integer.toString(id, 16)).entrySet());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
serverReplicator.publish(new IndexRevision(writer));
|
serverReplicator.publish(new IndexRevision(writer));
|
||||||
}
|
}
|
||||||
|
@ -523,7 +523,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||||||
final Map<String,String> commitData = new HashMap<>();
|
final Map<String,String> commitData = new HashMap<>();
|
||||||
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
|
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
|
||||||
String.valueOf(System.currentTimeMillis()));
|
String.valueOf(System.currentTimeMillis()));
|
||||||
iw.setCommitData(commitData);
|
iw.setLiveCommitData(commitData.entrySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void prepareCommit(CommitUpdateCommand cmd) throws IOException {
|
public void prepareCommit(CommitUpdateCommand cmd) throws IOException {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user