mirror of https://github.com/apache/lucene.git
LUCENE-9309: Wait for #addIndexes merges when aborting merges (#1418)
The SegmentMerger usage in IW#addIndexes(CodecReader...) might make changes to the Directory while the IW tries to clean-up files on rollback. This causes issues like FileNotFoundExceptions when IDF tries to remove temp files. This changes adds a waiting mechanism to the abortMerges method that, in addition to the running merges, also waits for merges in addIndices(CodecReader...)
This commit is contained in:
parent
2935186c5b
commit
e376582e25
|
@ -167,6 +167,8 @@ Bug Fixes
|
||||||
* LUCENE-9300: Fix corruption of the new gen field infos when doc values updates are applied on a segment created
|
* LUCENE-9300: Fix corruption of the new gen field infos when doc values updates are applied on a segment created
|
||||||
externally and added to the index with IndexWriter#addIndexes(Directory). (Jim Ferenczi, Adrien Grand)
|
externally and added to the index with IndexWriter#addIndexes(Directory). (Jim Ferenczi, Adrien Grand)
|
||||||
|
|
||||||
|
* LUCENE-9309: Wait for #addIndexes merges when aborting merges. (Simon Willnauer)
|
||||||
|
|
||||||
Other
|
Other
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
|
|
@ -371,10 +371,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int availablePermits() {
|
|
||||||
return permits.availablePermits();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final IndexFileDeleter deleter;
|
final IndexFileDeleter deleter;
|
||||||
|
@ -397,11 +393,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
||||||
|
|
||||||
private final MergeScheduler mergeScheduler;
|
private final MergeScheduler mergeScheduler;
|
||||||
|
private Set<SegmentMerger> runningAddIndexesMerges = new HashSet<>();
|
||||||
private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
|
private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
|
||||||
private Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
|
private Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
|
||||||
private List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
|
private List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
|
||||||
private long mergeGen;
|
private long mergeGen;
|
||||||
private boolean stopMerges;
|
private boolean stopMerges; // TODO make sure this is only changed once and never set back to false
|
||||||
private boolean didMessageState;
|
private boolean didMessageState;
|
||||||
|
|
||||||
final AtomicInteger flushCount = new AtomicInteger();
|
final AtomicInteger flushCount = new AtomicInteger();
|
||||||
|
@ -2285,6 +2282,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
stopMerges = true; // this disables merges forever
|
||||||
abortMerges();
|
abortMerges();
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
infoStream.message("IW", "rollback: done finish merges");
|
infoStream.message("IW", "rollback: done finish merges");
|
||||||
|
@ -2447,8 +2445,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
try {
|
try {
|
||||||
// Abort any running merges
|
// Abort any running merges
|
||||||
abortMerges();
|
abortMerges();
|
||||||
// Let merges run again
|
|
||||||
stopMerges = false;
|
|
||||||
adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
|
adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
|
||||||
// Remove all segments
|
// Remove all segments
|
||||||
segmentInfos.clear();
|
segmentInfos.clear();
|
||||||
|
@ -2491,9 +2487,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
* method: when you abort a long-running merge, you lose
|
* method: when you abort a long-running merge, you lose
|
||||||
* a lot of work that must later be redone. */
|
* a lot of work that must later be redone. */
|
||||||
private synchronized void abortMerges() {
|
private synchronized void abortMerges() {
|
||||||
|
|
||||||
stopMerges = true;
|
|
||||||
|
|
||||||
// Abort all pending & running merges:
|
// Abort all pending & running merges:
|
||||||
for (final MergePolicy.OneMerge merge : pendingMerges) {
|
for (final MergePolicy.OneMerge merge : pendingMerges) {
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
|
@ -2514,10 +2507,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
// We wait here to make all merges stop. It should not
|
// We wait here to make all merges stop. It should not
|
||||||
// take very long because they periodically check if
|
// take very long because they periodically check if
|
||||||
// they are aborted.
|
// they are aborted.
|
||||||
while (runningMerges.size() != 0) {
|
while (runningMerges.size() + runningAddIndexesMerges.size() != 0) {
|
||||||
|
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort");
|
infoStream.message("IW", "now wait for " + runningMerges.size()
|
||||||
|
+ " running merge/s to abort; currently running addIndexes: " + runningAddIndexesMerges.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
doWait();
|
doWait();
|
||||||
|
@ -2993,7 +2987,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
||||||
return docWriter.deleteQueue.getNextSequenceNumber();
|
return docWriter.deleteQueue.getNextSequenceNumber();
|
||||||
}
|
}
|
||||||
|
|
||||||
merger.merge(); // merge 'em
|
synchronized (this) {
|
||||||
|
ensureOpen();
|
||||||
|
assert stopMerges == false;
|
||||||
|
runningAddIndexesMerges.add(merger);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
merger.merge(); // merge 'em
|
||||||
|
} finally {
|
||||||
|
synchronized (this) {
|
||||||
|
runningAddIndexesMerges.remove(merger);
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
SegmentCommitInfo infoPerCommit = new SegmentCommitInfo(info, 0, numSoftDeleted, -1L, -1L, -1L);
|
SegmentCommitInfo infoPerCommit = new SegmentCommitInfo(info, 0, numSoftDeleted, -1L, -1L, -1L);
|
||||||
|
|
||||||
info.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
|
info.setFiles(new HashSet<>(trackingDir.getCreatedFiles()));
|
||||||
|
|
|
@ -208,7 +208,7 @@ final class SegmentMerger {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void mergeFieldInfos() throws IOException {
|
public void mergeFieldInfos() {
|
||||||
for (FieldInfos readerFieldInfos : mergeState.fieldInfos) {
|
for (FieldInfos readerFieldInfos : mergeState.fieldInfos) {
|
||||||
for (FieldInfo fi : readerFieldInfos) {
|
for (FieldInfo fi : readerFieldInfos) {
|
||||||
fieldInfosBuilder.add(fi);
|
fieldInfosBuilder.add(fi);
|
||||||
|
|
Loading…
Reference in New Issue