LUCENE-7888: fix concurrency hazards between merge completing and DV updates applying

This commit is contained in:
Mike McCandless 2017-07-02 15:58:53 -04:00
parent ee1edd9d46
commit eaf1d45a1c
3 changed files with 23 additions and 9 deletions

View File

@ -262,6 +262,8 @@ class FrozenBufferedUpdates {
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
@ -334,7 +336,7 @@ class FrozenBufferedUpdates {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
}
@ -352,6 +354,13 @@ class FrozenBufferedUpdates {
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
writer.bufferedUpdatesStream.finished(this);
finished = true;
// No merge finished while we were applying, so we are done!
break;
}
@ -367,9 +376,11 @@ class FrozenBufferedUpdates {
iter++;
}
// Record that this packet is finished:
writer.bufferedUpdatesStream.finished(this);
if (finished == false) {
// Record that this packet is finished:
writer.bufferedUpdatesStream.finished(this);
}
if (infoStream.isEnabled("BD")) {
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",

View File

@ -642,12 +642,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
void writeDocValuesUpdates(List<SegmentCommitInfo> infos) throws IOException {
void writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException {
boolean any = false;
for (SegmentCommitInfo info : infos) {
ReadersAndUpdates rld = get(info, false);
if (rld != null) {
any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
rld.setIsMerging();
}
}
if (any) {
@ -4216,7 +4217,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Must move the pending doc values updates to disk now, else the newly merged segment will not see them:
// TODO: we could fix merging to pull the merged DV iterator so we don't have to move these updates to disk first, i.e. just carry them
// in memory:
readerPool.writeDocValuesUpdates(merge.segments);
readerPool.writeDocValuesUpdatesForMerge(merge.segments);
// Bind a new segment name here so even with
// ConcurrentMergePolicy we keep deterministic segment

View File

@ -808,14 +808,16 @@ class ReadersAndUpdates {
return true;
}
/** Returns a reader for merge, with the latest doc values updates and deletions. */
synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
synchronized public void setIsMerging() {
// This ensures any newly resolved doc value updates while we are merging are
// saved for re-applying after this segment is done merging:
isMerging = true;
assert mergingDVUpdates.isEmpty();
}
/** Returns a reader for merge, with the latest doc values updates and deletions. */
synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
// We must carry over any still-pending DV updates because they were not
// successfully written, e.g. because there was a hole in the delGens,