From ef291c9767640198e1d79132b460a88f93b25456 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 12 Mar 2018 12:27:06 +0100 Subject: [PATCH] LUCENE-8200: Allow doc-values to be updated atomically together with a document Today we can only update a document by deleting all previously indexed documents for the given term. In some cases like when deletes are not `final` in the way that documents that are marked as deleted should not be merged away a `soft-delete` is needed which is possible when doc-values updatea can be done atomically just like delete and add in updateDocument(s) This change introduces such a soft update that reuses all code paths from deletes to update all previously updated documents for a given term instead of marking it as deleted. This is a spinnoff from LUCENE-8198 --- lucene/CHANGES.txt | 9 +- .../apache/lucene/index/DocumentsWriter.java | 12 +- .../index/DocumentsWriterDeleteQueue.java | 45 ++- .../index/DocumentsWriterPerThread.java | 24 +- .../lucene/index/FrozenBufferedUpdates.java | 125 ++++----- .../org/apache/lucene/index/IndexWriter.java | 150 +++++++--- .../lucene/index/ReadersAndUpdates.java | 5 - .../index/TestDocumentsWriterDeleteQueue.java | 5 +- .../apache/lucene/index/TestIndexWriter.java | 259 ++++++++++++++++++ 9 files changed, 506 insertions(+), 128 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 7f336e2f9ac..6be005ddedc 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -94,7 +94,14 @@ Optimizations to run faster. (Adrien Grand) ======================= Lucene 7.4.0 ======================= -(No Changes) + +New Features + +* LUCENE-8200: Allow doc-values to be updated atomically together + with a document. Doc-Values updates now can be used as a soft-delete + mechanism to all keeping several version of a document or already + deleted documents around for later reuse. See "IW.softUpdateDocument(...)" + for reference. (Simon Willnauer) ======================= Lucene 7.3.0 ======================= diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 7ad4feb279e..d49c1dab0fe 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -433,7 +433,7 @@ final class DocumentsWriter implements Closeable, Accountable { } long updateDocuments(final Iterable> docs, final Analyzer analyzer, - final Term delTerm) throws IOException, AbortingException { + final DocumentsWriterDeleteQueue.Node delNode) throws IOException, AbortingException { boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); @@ -449,7 +449,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { - seqNo = dwpt.updateDocuments(docs, analyzer, delTerm); + seqNo = dwpt.updateDocuments(docs, analyzer, delNode); } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); @@ -460,7 +460,7 @@ final class DocumentsWriter implements Closeable, Accountable { // accumulate our separate counter: numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); } - final boolean isUpdate = delTerm != null; + final boolean isUpdate = delNode != null && delNode.isDelete(); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; @@ -477,7 +477,7 @@ final class DocumentsWriter implements Closeable, Accountable { } long updateDocument(final Iterable doc, final Analyzer analyzer, - final Term delTerm) throws IOException, AbortingException { + final DocumentsWriterDeleteQueue.Node delNode) throws IOException, AbortingException { boolean hasEvents = preUpdate(); @@ -494,7 +494,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { - seqNo = dwpt.updateDocument(doc, analyzer, delTerm); + seqNo = dwpt.updateDocument(doc, analyzer, delNode); } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); @@ -505,7 +505,7 @@ final class DocumentsWriter implements Closeable, Accountable { // accumulate our separate counter: numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); } - final boolean isUpdate = delTerm != null; + final boolean isUpdate = delNode != null && delNode.isDelete(); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index c4a084558cb..ad9c0d1627a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -56,7 +56,7 @@ import org.apache.lucene.util.InfoStream; *
    *
  1. consumes a document and finishes its processing
  2. *
  3. updates its private {@link DeleteSlice} either by calling - * {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the + * {@link #updateSlice(DeleteSlice)} or {@link #add(Node, DeleteSlice)} (if the * document has a delTerm)
  4. *
  5. applies all deletes in the slice to its private {@link BufferedUpdates} * and resets it
  6. @@ -131,13 +131,20 @@ final class DocumentsWriterDeleteQueue implements Accountable { tryApplyGlobalSlice(); return seqNo; } - + + static Node newNode(Term term) { + return new TermNode(term); + } + + static Node newNode(DocValuesUpdate... updates) { + return new DocValuesUpdatesNode(updates); + } + /** * invariant for document update */ - long add(Term term, DeleteSlice slice) { - final TermNode termNode = new TermNode(term); - long seqNo = add(termNode); + long add(Node deleteNode, DeleteSlice slice) { + long seqNo = add(deleteNode); /* * this is an update request where the term is the updated documents * delTerm. in that case we need to guarantee that this insert is atomic @@ -148,7 +155,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { * will apply this delete next time we update our slice and one of the two * competing updates wins! */ - slice.sliceTail = termNode; + slice.sliceTail = deleteNode; assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add"; tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe // we can do it just every n times or so? @@ -291,12 +298,20 @@ final class DocumentsWriterDeleteQueue implements Accountable { sliceHead = sliceTail; } + /** + * Returns true iff the given node is identical to the the slices tail, + * otherwise false. + */ + boolean isTail(Node node) { + return sliceTail == node; + } + /** * Returns true iff the given item is identical to the item * hold by the slices tail, otherwise false. */ - boolean isTailItem(Object item) { - return sliceTail.item == item; + boolean isTailItem(Object object) { + return sliceTail.item == object; } boolean isEmpty() { @@ -319,7 +334,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { } } - private static class Node { + static class Node { volatile Node next; final T item; @@ -330,6 +345,10 @@ final class DocumentsWriterDeleteQueue implements Accountable { void apply(BufferedUpdates bufferedDeletes, int docIDUpto) { throw new IllegalStateException("sentinel item must never be applied"); } + + boolean isDelete() { + return true; + } } private static final class TermNode extends Node { @@ -347,6 +366,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { public String toString() { return "del=" + item; } + } private static final class QueryArrayNode extends Node { @@ -378,6 +398,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { public String toString() { return "dels=" + Arrays.toString(item); } + } private static final class DocValuesUpdatesNode extends Node { @@ -402,6 +423,12 @@ final class DocumentsWriterDeleteQueue implements Accountable { } } + + @Override + boolean isDelete() { + return false; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 76c29065f74..3ee10d0cff6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -218,7 +218,7 @@ class DocumentsWriterPerThread { } } - public long updateDocument(Iterable doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { + public long updateDocument(Iterable doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node deleteNode) throws IOException, AbortingException { testPoint("DocumentsWriterPerThread addDocument start"); assert deleteQueue != null; reserveOneDoc(); @@ -226,7 +226,7 @@ class DocumentsWriterPerThread { docState.analyzer = analyzer; docState.docID = numDocsInRAM; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); + infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name); } // Even on exception, the document is still added (but marked // deleted), so we don't need to un-reserve at that point. @@ -250,15 +250,15 @@ class DocumentsWriterPerThread { } } - return finishDocument(delTerm); + return finishDocument(deleteNode); } - public long updateDocuments(Iterable> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { + public long updateDocuments(Iterable> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node deleteNode) throws IOException, AbortingException { testPoint("DocumentsWriterPerThread addDocuments start"); assert deleteQueue != null; docState.analyzer = analyzer; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); + infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name); } int docCount = 0; boolean allDocsIndexed = false; @@ -296,9 +296,9 @@ class DocumentsWriterPerThread { // succeeded, but apply it only to docs prior to when // this batch started: long seqNo; - if (delTerm != null) { - seqNo = deleteQueue.add(delTerm, deleteSlice); - assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + if (deleteNode != null) { + seqNo = deleteQueue.add(deleteNode, deleteSlice); + assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail"; deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); return seqNo; } else { @@ -328,7 +328,7 @@ class DocumentsWriterPerThread { } } - private long finishDocument(Term delTerm) { + private long finishDocument(DocumentsWriterDeleteQueue.Node deleteNode) { /* * here we actually finish the document in two steps 1. push the delete into * the queue and update our slice. 2. increment the DWPT private document @@ -339,9 +339,9 @@ class DocumentsWriterPerThread { */ boolean applySlice = numDocsInRAM != 0; long seqNo; - if (delTerm != null) { - seqNo = deleteQueue.add(delTerm, deleteSlice); - assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + if (deleteNode != null) { + seqNo = deleteQueue.add(deleteNode, deleteSlice); + assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail"; } else { seqNo = deleteQueue.updateSlice(deleteSlice); diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 202bf2cc49a..1f8974a510d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -141,42 +141,42 @@ class FrozenBufferedUpdates { throws IOException { // TODO: we could do better here, e.g. collate the updates by field // so if you are updating 2 fields interleaved we don't keep writing the field strings + try (RAMOutputStream out = new RAMOutputStream()) { + String lastTermField = null; + String lastUpdateField = null; + for (LinkedHashMap numericUpdates : numericDVUpdates.values()) { + numericDVUpdateCount += numericUpdates.size(); + for (NumericDocValuesUpdate update : numericUpdates.values()) { - RAMOutputStream out = new RAMOutputStream(); - String lastTermField = null; - String lastUpdateField = null; - for (LinkedHashMap numericUpdates : numericDVUpdates.values()) { - numericDVUpdateCount += numericUpdates.size(); - for (NumericDocValuesUpdate update : numericUpdates.values()) { + int code = update.term.bytes().length << 2; - int code = update.term.bytes().length << 2; + String termField = update.term.field(); + if (termField.equals(lastTermField) == false) { + code |= 1; + } + String updateField = update.field; + if (updateField.equals(lastUpdateField) == false) { + code |= 2; + } + out.writeVInt(code); + out.writeVInt(update.docIDUpto); + if ((code & 1) != 0) { + out.writeString(termField); + lastTermField = termField; + } + if ((code & 2) != 0) { + out.writeString(updateField); + lastUpdateField = updateField; + } - String termField = update.term.field(); - if (termField.equals(lastTermField) == false) { - code |= 1; + out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); + out.writeZLong(((Long) update.value).longValue()); } - String updateField = update.field; - if (updateField.equals(lastUpdateField) == false) { - code |= 2; - } - out.writeVInt(code); - out.writeVInt(update.docIDUpto); - if ((code & 1) != 0) { - out.writeString(termField); - lastTermField = termField; - } - if ((code & 2) != 0) { - out.writeString(updateField); - lastUpdateField = updateField; - } - - out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); - out.writeZLong(((Long) update.value).longValue()); } + byte[] bytes = new byte[(int) out.getFilePointer()]; + out.writeTo(bytes, 0); + return bytes; } - byte[] bytes = new byte[(int) out.getFilePointer()]; - out.writeTo(bytes, 0); - return bytes; } private byte[] freezeBinaryDVUpdates(Map> binaryDVUpdates) @@ -184,43 +184,44 @@ class FrozenBufferedUpdates { // TODO: we could do better here, e.g. collate the updates by field // so if you are updating 2 fields interleaved we don't keep writing the field strings - RAMOutputStream out = new RAMOutputStream(); - String lastTermField = null; - String lastUpdateField = null; - for (LinkedHashMap binaryUpdates : binaryDVUpdates.values()) { - binaryDVUpdateCount += binaryUpdates.size(); - for (BinaryDocValuesUpdate update : binaryUpdates.values()) { + try (RAMOutputStream out = new RAMOutputStream()) { + String lastTermField = null; + String lastUpdateField = null; + for (LinkedHashMap binaryUpdates : binaryDVUpdates.values()) { + binaryDVUpdateCount += binaryUpdates.size(); + for (BinaryDocValuesUpdate update : binaryUpdates.values()) { - int code = update.term.bytes().length << 2; + int code = update.term.bytes().length << 2; - String termField = update.term.field(); - if (termField.equals(lastTermField) == false) { - code |= 1; - } - String updateField = update.field; - if (updateField.equals(lastUpdateField) == false) { - code |= 2; - } - out.writeVInt(code); - out.writeVInt(update.docIDUpto); - if (termField.equals(lastTermField) == false) { - out.writeString(termField); - lastTermField = termField; - } - if (updateField.equals(lastUpdateField) == false) { - out.writeString(updateField); - lastUpdateField = updateField; - } - out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); + String termField = update.term.field(); + if (termField.equals(lastTermField) == false) { + code |= 1; + } + String updateField = update.field; + if (updateField.equals(lastUpdateField) == false) { + code |= 2; + } + out.writeVInt(code); + out.writeVInt(update.docIDUpto); + if (termField.equals(lastTermField) == false) { + out.writeString(termField); + lastTermField = termField; + } + if (updateField.equals(lastUpdateField) == false) { + out.writeString(updateField); + lastUpdateField = updateField; + } + out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); - BytesRef value = (BytesRef) update.value; - out.writeVInt(value.length); - out.writeBytes(value.bytes, value.offset, value.length); + BytesRef value = (BytesRef) update.value; + out.writeVInt(value.length); + out.writeBytes(value.bytes, value.offset, value.length); + } } + byte[] bytes = new byte[(int) out.getFilePointer()]; + out.writeTo(bytes, 0); + return bytes; } - byte[] bytes = new byte[(int) out.getFilePointer()]; - out.writeTo(bytes, 0); - return bytes; } /** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index dcc3be5d071..6de8d3bf991 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1459,7 +1459,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @throws IOException if there is a low-level IO error */ public long addDocument(Iterable doc) throws IOException { - return updateDocument(null, doc); + return updateDocument((DocumentsWriterDeleteQueue.Node) null, doc); } /** @@ -1503,7 +1503,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @lucene.experimental */ public long addDocuments(Iterable> docs) throws IOException { - return updateDocuments(null, docs); + return updateDocuments((DocumentsWriterDeleteQueue.Node) null, docs); } /** @@ -1523,11 +1523,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @lucene.experimental */ public long updateDocuments(Term delTerm, Iterable> docs) throws IOException { + return updateDocuments(delTerm == null ? null : DocumentsWriterDeleteQueue.newNode(delTerm), docs); + } + + private long updateDocuments(final DocumentsWriterDeleteQueue.Node delNode, Iterable> docs) throws IOException { ensureOpen(); try { boolean success = false; try { - long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm); + long seqNo = docWriter.updateDocuments(docs, analyzer, delNode); if (seqNo < 0) { seqNo = -seqNo; processEvents(true, false); @@ -1549,6 +1553,48 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } + /** + * Expert: + * Atomically updates documents matching the provided + * term with the given doc-values fields + * and adds a block of documents with sequentially + * assigned document IDs, such that an external reader + * will see all or none of the documents. + * + * One use of this API is to retain older versions of + * documents instead of replacing them. The existing + * documents can be updated to reflect they are no + * longer current while atomically adding new documents + * at the same time. + * + * In contrast to {@link #updateDocuments(Term, Iterable)} + * this method will not delete documents in the index + * matching the given term but instead update them with + * the given doc-values fields which can be used as a + * soft-delete mechanism. + * + * See {@link #addDocuments(Iterable)} + * and {@link #updateDocuments(Term, Iterable)}. + * + * + * @return The sequence number + * for this operation + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public long softUpdateDocuments(Term term, Iterable> docs, Field... softDeletes) throws IOException { + if (term == null) { + throw new IllegalArgumentException("term must not be null"); + } + if (softDeletes == null || softDeletes.length == 0) { + throw new IllegalArgumentException("at least one soft delete must be present"); + } + return updateDocuments(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes, false)), docs); + } + /** Expert: attempts to delete by document ID, as long as * the provided reader is a near-real-time reader (from {@link * DirectoryReader#open(IndexWriter)}). If the @@ -1720,11 +1766,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * @throws IOException if there is a low-level IO error */ public long updateDocument(Term term, Iterable doc) throws IOException { + return updateDocument(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), doc); + } + + private long updateDocument(final DocumentsWriterDeleteQueue.Node delNode, + Iterable doc) throws IOException { ensureOpen(); try { boolean success = false; try { - long seqNo = docWriter.updateDocument(doc, analyzer, term); + long seqNo = docWriter.updateDocument(doc, analyzer, delNode); if (seqNo < 0) { seqNo = - seqNo; processEvents(true, false); @@ -1746,6 +1797,50 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } + + /** + * Expert: + * Updates a document by first updating the document(s) + * containing term with the given doc-values fields + * and then adding the new document. The doc-values update and + * then add are atomic as seen by a reader on the same index + * (flush may happen only after the add). + * + * One use of this API is to retain older versions of + * documents instead of replacing them. The existing + * documents can be updated to reflect they are no + * longer current while atomically adding new documents + * at the same time. + * + * In contrast to {@link #updateDocument(Term, Iterable)} + * this method will not delete documents in the index + * matching the given term but instead update them with + * the given doc-values fields which can be used as a + * soft-delete mechanism. + * + * See {@link #addDocuments(Iterable)} + * and {@link #updateDocuments(Term, Iterable)}. + * + * + * @return The sequence number + * for this operation + * + * @throws CorruptIndexException if the index is corrupt + * @throws IOException if there is a low-level IO error + * + * @lucene.experimental + */ + public long softUpdateDocument(Term term, Iterable doc, Field... softDeletes) throws IOException { + if (term == null) { + throw new IllegalArgumentException("term must not be null"); + } + if (softDeletes == null || softDeletes.length == 0) { + throw new IllegalArgumentException("at least one soft delete must be present"); + } + return updateDocument(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes, false)), doc); + } + + /** * Updates a document's {@link NumericDocValues} for field to the * given value. You can only update fields that already exist in @@ -1855,6 +1950,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { */ public long updateDocValues(Term term, Field... updates) throws IOException { ensureOpen(); + DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(term, updates, true); + try { + long seqNo = docWriter.updateDocValues(dvUpdates); + if (seqNo < 0) { + seqNo = -seqNo; + processEvents(true, false); + } + return seqNo; + } catch (VirtualMachineError tragedy) { + tragicEvent(tragedy, "updateDocValues"); + + // dead code but javac disagrees: + return -1; + } + } + + private DocValuesUpdate[] buildDocValuesUpdate(Term term, Field[] updates, boolean enforceFieldExistence) { DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length]; for (int i = 0; i < updates.length; i++) { final Field f = updates[i]; @@ -1865,7 +1977,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (dvType == DocValuesType.NONE) { throw new IllegalArgumentException("can only update NUMERIC or BINARY fields! field=" + f.name()); } - if (!globalFieldNumberMap.contains(f.name(), dvType)) { + if (enforceFieldExistence && !globalFieldNumberMap.contains(f.name(), dvType)) { throw new IllegalArgumentException("can only update existing docvalues fields! field=" + f.name() + ", type=" + dvType); } if (config.getIndexSortFields().contains(f.name())) { @@ -1882,21 +1994,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { throw new IllegalArgumentException("can only update NUMERIC or BINARY fields: field=" + f.name() + ", type=" + dvType); } } - try { - long seqNo = docWriter.updateDocValues(dvUpdates); - if (seqNo < 0) { - seqNo = -seqNo; - processEvents(true, false); - } - return seqNo; - } catch (VirtualMachineError tragedy) { - tragicEvent(tragedy, "updateDocValues"); - - // dead code but javac disagrees: - return -1; - } + return dvUpdates; } - + // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); @@ -3693,18 +3793,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } - private static class MergedDeletesAndUpdates { - ReadersAndUpdates mergedDeletesAndUpdates = null; - - MergedDeletesAndUpdates() {} - - final void init(ReaderPool readerPool, MergePolicy.OneMerge merge) throws IOException { - if (mergedDeletesAndUpdates == null) { - mergedDeletesAndUpdates = readerPool.get(merge.info, true); - } - } - } - /** * Carefully merges deletes and updates for the segments we just merged. This * is tricky because, although merging will clear all deletes (compacts the diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index 0e322562268..27c1332b3da 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -291,11 +291,6 @@ class ReadersAndUpdates { return liveDocs; } - public synchronized Bits getReadOnlyLiveDocs() { - liveDocsShared = true; - return liveDocs; - } - public synchronized void dropChanges() { assert Thread.holdsLock(writer); // Discard (don't save) changes when we are dropping diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java index 8991aea0588..cb812a350ea 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java @@ -236,8 +236,9 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase { int i = 0; while ((i = index.getAndIncrement()) < ids.length) { Term term = new Term("id", ids[i].toString()); - queue.add(term, slice); - assertTrue(slice.isTailItem(term)); + DocumentsWriterDeleteQueue.Node termNode = DocumentsWriterDeleteQueue.newNode(term); + queue.add(termNode, slice); + assertTrue(slice.isTail(termNode)); slice.apply(deletes, BufferedUpdates.MAX_INT); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 983581ac3b7..814d816699e 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.io.StringReader; +import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.FileSystem; import java.nio.file.Files; @@ -71,6 +72,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PhraseQuery; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; @@ -86,6 +88,7 @@ import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSLockFactory; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; @@ -2955,5 +2958,261 @@ public class TestIndexWriter extends LuceneTestCase { } } } + private static Bits getSoftDeletesLiveDocs(LeafReader reader, String field) { + try { + NumericDocValues softDelete = reader.getNumericDocValues(field); + if (softDelete != null) { + BitSet bitSet = BitSet.of(softDelete, reader.maxDoc()); + Bits inLiveDocs = reader.getLiveDocs() == null ? new Bits.MatchAllBits(reader.maxDoc()) : reader.getLiveDocs(); + Bits newliveDocs = new Bits() { + @Override + public boolean get(int index) { + return inLiveDocs.get(index) && bitSet.get(index) == false; + } + + @Override + public int length() { + return inLiveDocs.length(); + } + }; + return newliveDocs; + + } else { + return reader.getLiveDocs(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static DirectoryReader wrapSoftDeletes(DirectoryReader reader, String field) throws IOException { + return new FilterDirectoryReader(reader, new FilterDirectoryReader.SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + Bits softDeletesLiveDocs = getSoftDeletesLiveDocs(reader, field); + int numDocs = getNumDocs(reader, softDeletesLiveDocs); + return new FilterLeafReader(reader) { + + @Override + public Bits getLiveDocs() { + return softDeletesLiveDocs; + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public int numDocs() { + return numDocs; + } + }; + } + }) { + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return wrapSoftDeletes(in, field); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + }; + } + + private static int getNumDocs(LeafReader reader, Bits softDeletesLiveDocs) { + int numDocs; + if (softDeletesLiveDocs == reader.getLiveDocs()) { + numDocs = reader.numDocs(); + } else { + int tmp = 0; + for (int i = 0; i < softDeletesLiveDocs.length(); i++) { + if (softDeletesLiveDocs.get(i) ) { + tmp++; + } + } + numDocs = tmp; + } + return numDocs; + } + + public void testSoftUpdateDocuments() throws IOException { + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + expectThrows(IllegalArgumentException.class, () -> { + writer.softUpdateDocument(null, new Document(), new NumericDocValuesField("soft_delete", 1)); + }); + + expectThrows(IllegalArgumentException.class, () -> { + writer.softUpdateDocument(new Term("id", "1"), new Document()); + }); + + expectThrows(IllegalArgumentException.class, () -> { + writer.softUpdateDocuments(null, Arrays.asList(new Document()), new NumericDocValuesField("soft_delete", 1)); + }); + + expectThrows(IllegalArgumentException.class, () -> { + writer.softUpdateDocuments(new Term("id", "1"), Arrays.asList(new Document())); + }); + + Document doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "1", Field.Store.YES)); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "2", Field.Store.YES)); + Field field = new NumericDocValuesField("soft_delete", 1); + writer.softUpdateDocument(new Term("id", "1"), doc, field); + DirectoryReader reader = wrapSoftDeletes(DirectoryReader.open(writer), "soft_delete"); + assertEquals(2, reader.docFreq(new Term("id", "1"))); + IndexSearcher searcher = new IndexSearcher(reader); + TopDocs topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10); + assertEquals(1, topDocs.totalHits); + Document document = reader.document(topDocs.scoreDocs[0].doc); + assertEquals("2", document.get("version")); + + // update the on-disk version + doc = new Document(); + doc.add(new StringField("id", "1", Field.Store.YES)); + doc.add(new StringField("version", "3", Field.Store.YES)); + field = new NumericDocValuesField("soft_delete", 1); + writer.softUpdateDocument(new Term("id", "1"), doc, field); + DirectoryReader oldReader = reader; + reader = DirectoryReader.openIfChanged(reader, writer); + assertNotSame(reader, oldReader); + oldReader.close(); + searcher = new IndexSearcher(reader); + topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10); + assertEquals(1, topDocs.totalHits); + document = reader.document(topDocs.scoreDocs[0].doc); + assertEquals("3", document.get("version")); + + // now delete it + writer.updateDocValues(new Term("id", "1"), field); + oldReader = reader; + reader = DirectoryReader.openIfChanged(reader, writer); + assertNotSame(reader, oldReader); + oldReader.close(); + searcher = new IndexSearcher(reader); + topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10); + assertEquals(0, topDocs.totalHits); + + writer.close(); + reader.close(); + dir.close(); + } + + public void testSoftUpdatesConcurrently() throws IOException, InterruptedException { + Directory dir = newDirectory(); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + AtomicBoolean mergeAwaySoftDeletes = new AtomicBoolean(random().nextBoolean()); + + indexWriterConfig.setMergePolicy(new OneMergeWrappingMergePolicy(indexWriterConfig.getMergePolicy(), towrap -> + new MergePolicy.OneMerge(towrap.segments) { + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + if (mergeAwaySoftDeletes.get() == false) { + return towrap.wrapForMerge(reader); + } + Bits softDeletesLiveDocs = getSoftDeletesLiveDocs(reader, "soft_delete"); + int numDocs = getNumDocs(reader, softDeletesLiveDocs); + CodecReader wrapped = towrap.wrapForMerge(reader); + return new FilterCodecReader(wrapped) { + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public Bits getLiveDocs() { + return softDeletesLiveDocs; + } + + @Override + public int numDocs() { + return numDocs; + } + }; + } + } + )); + IndexWriter writer = new IndexWriter(dir, indexWriterConfig); + Thread[] threads = new Thread[2 + random().nextInt(3)]; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch started = new CountDownLatch(threads.length); + boolean updateSeveralDocs = random().nextBoolean(); + Set ids = Collections.synchronizedSet(new HashSet<>()); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + started.countDown(); + startLatch.await(); + for (int d = 0; d < 100; d++) { + String id = String.valueOf(random().nextInt(10)); + if (updateSeveralDocs) { + Document doc = new Document(); + doc.add(new StringField("id", id, Field.Store.YES)); + writer.softUpdateDocuments(new Term("id", id), Arrays.asList(doc, doc), + new NumericDocValuesField("soft_delete", 1)); + } else { + Document doc = new Document(); + doc.add(new StringField("id", id, Field.Store.YES)); + writer.softUpdateDocument(new Term("id", id), doc, + new NumericDocValuesField("soft_delete", 1)); + } + ids.add(id); + } + } catch (IOException | InterruptedException e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + started.await(); + startLatch.countDown(); + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + DirectoryReader reader = wrapSoftDeletes(DirectoryReader.open(writer), "soft_delete"); + IndexSearcher searcher = new IndexSearcher(reader); + for (String id : ids) { + TopDocs topDocs = searcher.search(new TermQuery(new Term("id", id)), 10); + if (updateSeveralDocs) { + assertEquals(2, topDocs.totalHits); + assertEquals(Math.abs(topDocs.scoreDocs[0].doc - topDocs.scoreDocs[1].doc), 1); + } else { + assertEquals(1, topDocs.totalHits); + } + } + mergeAwaySoftDeletes.set(true); + writer.forceMerge(1); + DirectoryReader oldReader = reader; + reader = DirectoryReader.openIfChanged(reader, writer); + assertNotSame(oldReader, reader); + oldReader.close(); + for (String id : ids) { + if (updateSeveralDocs) { + assertEquals(2, reader.docFreq(new Term("id", id))); + } else { + assertEquals(1, reader.docFreq(new Term("id", id))); + } + } + + IOUtils.close(reader, writer, dir); + } }