Consolidated process event logic after CRUD action (#1325)

Today we have duplicated logic on how to convert a seqNo into a real
seqNo and process events based on this. This change consolidated the logic
into a single method.
This commit is contained in:
Simon Willnauer 2020-03-09 18:47:43 +01:00 committed by GitHub
parent c8dea5d77f
commit 44bdfb2a07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 42 deletions

View File

@ -332,7 +332,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final ReaderPool readerPool; private final ReaderPool readerPool;
final BufferedUpdatesStream bufferedUpdatesStream; final BufferedUpdatesStream bufferedUpdatesStream;
/** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#apply} /** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#forceApply(IndexWriter)}
* to handle concurrently apply deletes/updates with merges completing. */ * to handle concurrently apply deletes/updates with merges completing. */
final AtomicLong mergeFinishedGen = new AtomicLong(); final AtomicLong mergeFinishedGen = new AtomicLong();
@ -1281,11 +1281,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
ensureOpen(); ensureOpen();
boolean success = false; boolean success = false;
try { try {
long seqNo = docWriter.updateDocuments(docs, analyzer, delNode); final long seqNo = maybeProcessEvents(docWriter.updateDocuments(docs, analyzer, delNode));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
success = true; success = true;
return seqNo; return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
@ -1518,12 +1514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
public long deleteDocuments(Term... terms) throws IOException { public long deleteDocuments(Term... terms) throws IOException {
ensureOpen(); ensureOpen();
try { try {
long seqNo = docWriter.deleteTerms(terms); return maybeProcessEvents(docWriter.deleteTerms(terms));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Term..)"); tragicEvent(tragedy, "deleteDocuments(Term..)");
throw tragedy; throw tragedy;
@ -1553,13 +1544,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} }
try { try {
long seqNo = docWriter.deleteQueries(queries); return maybeProcessEvents(docWriter.deleteQueries(queries));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Query..)"); tragicEvent(tragedy, "deleteDocuments(Query..)");
throw tragedy; throw tragedy;
@ -1591,11 +1576,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
ensureOpen(); ensureOpen();
boolean success = false; boolean success = false;
try { try {
long seqNo = docWriter.updateDocument(doc, analyzer, delNode); final long seqNo = maybeProcessEvents(docWriter.updateDocument(doc, analyzer, delNode));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
success = true; success = true;
return seqNo; return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
@ -1684,12 +1665,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
throw new IllegalArgumentException("cannot update docvalues field involved in the index sort, field=" + field + ", sort=" + config.getIndexSort()); throw new IllegalArgumentException("cannot update docvalues field involved in the index sort, field=" + field + ", sort=" + config.getIndexSort());
} }
try { try {
long seqNo = docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value)); return maybeProcessEvents(docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value)));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateNumericDocValue"); tragicEvent(tragedy, "updateNumericDocValue");
throw tragedy; throw tragedy;
@ -1729,12 +1705,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
throw new IllegalArgumentException("can only update existing binary-docvalues fields!"); throw new IllegalArgumentException("can only update existing binary-docvalues fields!");
} }
try { try {
long seqNo = docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value)); return maybeProcessEvents(docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value)));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateBinaryDocValue"); tragicEvent(tragedy, "updateBinaryDocValue");
throw tragedy; throw tragedy;
@ -1764,12 +1735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
ensureOpen(); ensureOpen();
DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(term, updates); DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(term, updates);
try { try {
long seqNo = docWriter.updateDocValues(dvUpdates); return maybeProcessEvents(docWriter.updateDocValues(dvUpdates));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues"); tragicEvent(tragedy, "updateDocValues");
throw tragedy; throw tragedy;
@ -5091,6 +5057,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", "decRefDeleter for NRT reader version=" + segmentInfos.getVersion() + " segments=" + segString(segmentInfos)); infoStream.message("IW", "decRefDeleter for NRT reader version=" + segmentInfos.getVersion() + " segments=" + segString(segmentInfos));
} }
} }
/**
* Processes all events and might trigger a merge if the given seqNo is negative
* @param seqNo if the seqNo is less than 0 this method will process events otherwise it's a no-op.
* @return the given seqId inverted if negative.
*/
private long maybeProcessEvents(long seqNo) throws IOException {
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
return seqNo;
}
private void processEvents(boolean triggerMerge) throws IOException { private void processEvents(boolean triggerMerge) throws IOException {
if (tragedy.get() == null) { if (tragedy.get() == null) {