From b0b32931b28da83d47f0561f4da734d6b9ee6e16 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 5 May 2018 09:55:58 +0200 Subject: [PATCH] LUCENE-8297: Add IW#tryUpdateDocValues(Reader, int, Fields...) IndexWriter can update doc values for a specific term but this might affect all documents containing the term. With tryUpdateDocValues users can update doc-values fields for individual documents. This allows for instance to soft-delete individual documents. The new method shares most of it's code with tryDeleteDocuments. --- lucene/CHANGES.txt | 6 + .../apache/lucene/index/DocValuesUpdate.java | 4 +- .../org/apache/lucene/index/IndexWriter.java | 89 ++++++++-- .../index/TestMixedDocValuesUpdates.java | 154 +++++++++++++++++- 4 files changed, 235 insertions(+), 18 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 5ded39ab692..ec68882864e 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -155,6 +155,12 @@ New Features * LUCENE-8265: WordDelimter/GraphFilter now have an option to skip tokens marked with KeywordAttribute (Mike Sokolov via Mike McCandless) +* LUCENE-8297: Add IW#tryUpdateDocValues(Reader, int, Fields...) IndexWriter can + update doc values for a specific term but this might affect all documents + containing the term. With tryUpdateDocValues users can update doc-values + fields for individual documents. This allows for instance to soft-delete + individual documents. (Simon Willnauer) + Bug Fixes * LUCENE-8266: Detect bogus tiles when creating a standard polygon and diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java index 8229b6094fc..c8bc8fbf05d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java @@ -83,7 +83,7 @@ abstract class DocValuesUpdate { /** An in-place update to a binary DocValues field */ static final class BinaryDocValuesUpdate extends DocValuesUpdate { - private final BytesRef value; + final BytesRef value; /* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */ private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*Integer.BYTES + NUM_BYTES_OBJECT_REF; @@ -132,7 +132,7 @@ abstract class DocValuesUpdate { /** An in-place update to a numeric DocValues field */ static final class NumericDocValuesUpdate extends DocValuesUpdate { - private final long value; + final long value; NumericDocValuesUpdate(Term term, String field, long value) { this(term, field, value, BufferedUpdates.MAX_INT); diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 93f54463556..d8ef5c04c8e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1347,7 +1347,82 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * to delete documents indexed after opening the NRT * reader you must use {@link #deleteDocuments(Term...)}). */ public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { + // NOTE: DON'T use docID inside the closure + return tryModifyDocument(readerIn, docID, (leafDocId, rld) -> { + if (rld.delete(leafDocId)) { + if (isFullyDeleted(rld)) { + dropDeletedSegment(rld.info); + checkpoint(); + } + // Must bump changeCount so if no other changes + // happened, we still commit this change: + changed(); + } + }); + } + + /** Expert: attempts to update doc values by document ID, as long as + * the provided reader is a near-real-time reader (from {@link + * DirectoryReader#open(IndexWriter)}). If the + * provided reader is an NRT reader obtained from this + * writer, and its segment has not been merged away, then + * the update succeeds and this method returns a valid (> 0) sequence + * number; else, it returns -1 and the caller must then + * either retry the update and resolve the document again. + * + * NOTE: this method can only updates documents + * visible to the currently open NRT reader. If you need + * to update documents indexed after opening the NRT + * reader you must use {@link #updateDocValues(Term, Field...)}. */ + public synchronized long tryUpdateDocValue(IndexReader readerIn, int docID, Field... fields) throws IOException { + // NOTE: DON'T use docID inside the closure + final DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(null, fields); + return tryModifyDocument(readerIn, docID, (leafDocId, rld) -> { + long nextGen = bufferedUpdatesStream.getNextGen(); + try { + Map fieldUpdatesMap = new HashMap<>(); + for (DocValuesUpdate update : dvUpdates) { + DocValuesFieldUpdates docValuesFieldUpdates = fieldUpdatesMap.computeIfAbsent(update.field, k -> { + switch (update.type) { + case NUMERIC: + return new NumericDocValuesFieldUpdates(nextGen, k, rld.info.info.maxDoc()); + case BINARY: + return new BinaryDocValuesFieldUpdates(nextGen, k, rld.info.info.maxDoc()); + default: + throw new AssertionError("type: " + update.type + " is not supported"); + } + }); + switch (update.type) { + case NUMERIC: + docValuesFieldUpdates.add(leafDocId, ((NumericDocValuesUpdate) update).value); + break; + case BINARY: + docValuesFieldUpdates.add(leafDocId, ((BinaryDocValuesUpdate) update).value); + break; + default: + throw new AssertionError("type: " + update.type + " is not supported"); + } + } + for (DocValuesFieldUpdates updates : fieldUpdatesMap.values()) { + updates.finish(); + rld.addDVUpdate(updates); + } + } finally { + bufferedUpdatesStream.finishedSegment(nextGen); + } + // Must bump changeCount so if no other changes + // happened, we still commit this change: + changed(); + }); + } + + @FunctionalInterface + private interface DocModifier { + void run(int docId, ReadersAndUpdates readersAndUpdates) throws IOException; + } + + private synchronized long tryModifyDocument(IndexReader readerIn, int docID, DocModifier toApply) throws IOException { final LeafReader reader; if (readerIn instanceof LeafReader) { // Reader is already atomic: use the incoming docID: @@ -1365,7 +1440,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (!(reader instanceof SegmentReader)) { throw new IllegalArgumentException("the reader must be a SegmentReader or composite reader containing only SegmentReaders"); } - + final SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo(); // TODO: this is a slow linear search, but, number of @@ -1377,21 +1452,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { ReadersAndUpdates rld = getPooledInstance(info, false); if (rld != null) { synchronized(bufferedUpdatesStream) { - if (rld.delete(docID)) { - if (isFullyDeleted(rld)) { - dropDeletedSegment(rld.info); - checkpoint(); - } - - // Must bump changeCount so if no other changes - // happened, we still commit this change: - changed(); - } + toApply.run(docID, rld); return docWriter.deleteQueue.getNextSequenceNumber(); } } } - return -1; } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java index f40379f9ad7..401de4df476 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java @@ -18,17 +18,25 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -258,7 +266,6 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase { writer.close(); DirectoryReader reader = DirectoryReader.open(dir); - BytesRef scratch = new BytesRef(); for (LeafReaderContext context : reader.leaves()) { LeafReader r = context.reader(); for (int i = 0; i < numFields; i++) { @@ -305,8 +312,14 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase { int doc = random().nextInt(numDocs); Term t = new Term("id", "doc" + doc); long value = random().nextLong(); - writer.updateDocValues(t, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)), - new NumericDocValuesField("cf", value*2)); + if (random().nextBoolean()) { + doUpdate(t, writer, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)), + new NumericDocValuesField("cf", value*2)); + } else { + writer.updateDocValues(t, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)), + new NumericDocValuesField("cf", value*2)); + } + DirectoryReader reader = DirectoryReader.open(writer); for (LeafReaderContext context : reader.leaves()) { LeafReader r = context.reader(); @@ -394,5 +407,138 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase { dir.close(); } - + + public void testTryUpdateDocValues() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, conf); + int numDocs = 1 + random().nextInt(128); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "" + i, Store.YES)); + doc.add(new NumericDocValuesField("id", i)); + doc.add(new BinaryDocValuesField("binaryId", new BytesRef(new byte[] {(byte)i}))); + writer.addDocument(doc); + if (random().nextBoolean()) { + writer.flush(); + } + } + int doc = random().nextInt(numDocs); + doUpdate(new Term("id", "" + doc), writer, new NumericDocValuesField("id", doc + 1), + new BinaryDocValuesField("binaryId", new BytesRef(new byte[]{(byte) (doc + 1)}))); + IndexReader reader = writer.getReader(); + NumericDocValues idValues = null; + BinaryDocValues binaryIdValues = null; + for (LeafReaderContext c : reader.leaves()) { + TopDocs topDocs = new IndexSearcher(c.reader()).search(new TermQuery(new Term("id", "" + doc)), 10); + if (topDocs.totalHits == 1) { + assertNull(idValues); + assertNull(binaryIdValues); + idValues = c.reader().getNumericDocValues("id"); + assertEquals(topDocs.scoreDocs[0].doc, idValues.advance(topDocs.scoreDocs[0].doc)); + binaryIdValues = c.reader().getBinaryDocValues("binaryId"); + assertEquals(topDocs.scoreDocs[0].doc, binaryIdValues.advance(topDocs.scoreDocs[0].doc)); + } else { + assertEquals(0, topDocs.totalHits); + } + } + + assertNotNull(idValues); + assertNotNull(binaryIdValues); + + assertEquals(doc+1, idValues.longValue()); + assertEquals(new BytesRef(new byte[] {(byte)(doc+1)}), binaryIdValues.binaryValue()); + IOUtils.close(reader, writer, dir); + } + + public void testTryUpdateMultiThreaded() throws IOException, BrokenBarrierException, InterruptedException { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(); + IndexWriter writer = new IndexWriter(dir, conf); + ReentrantLock[] locks = new ReentrantLock[25 + random().nextInt(50)]; + int[] values = new int[locks.length]; + for (int i = 0; i < locks.length; i++) { + locks[i] = new ReentrantLock(); + Document doc = new Document(); + values[i] = random().nextInt(); + doc.add(new StringField("id", Integer.toString(i), Store.NO)); + doc.add(new NumericDocValuesField("value", values[i])); + writer.addDocument(doc); + } + + Thread[] threads = new Thread[2 + random().nextInt(3)]; + CyclicBarrier barrier = new CyclicBarrier(threads.length + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + barrier.await(); + for (int doc = 0; doc < 1000; doc++) { + int docId = random().nextInt(locks.length); + locks[docId].lock(); + try { + int value = random().nextInt(); + if (random().nextBoolean()) { + writer.updateDocValues(new Term("id", docId + ""), new NumericDocValuesField("value", value)); + } else { + doUpdate(new Term("id", docId + ""), writer, new NumericDocValuesField("value", value)); + } + values[docId] = value; + } catch (IOException e) { + throw new AssertionError(e); + } finally { + locks[docId].unlock(); + } + + if (rarely()) { + writer.flush(); + } + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + + barrier.await(); + for (Thread t : threads) { + t.join(); + } + try (DirectoryReader reader = writer.getReader()) { + for (int i = 0; i < locks.length; i++) { + locks[i].lock(); + try { + int value = values[i]; + TopDocs topDocs = new IndexSearcher(reader).search(new TermQuery(new Term("id", "" + i)), 10); + assertEquals(topDocs.totalHits, 1); + int docID = topDocs.scoreDocs[0].doc; + List leaves = reader.leaves(); + int subIndex = ReaderUtil.subIndex(docID, leaves); + LeafReader leafReader = leaves.get(subIndex).reader(); + docID -= leaves.get(subIndex).docBase; + NumericDocValues numericDocValues = leafReader.getNumericDocValues("value"); + assertEquals(docID, numericDocValues.advance(docID)); + assertEquals(numericDocValues.longValue(), value); + } finally { + locks[i].unlock(); + } + + } + } + + IOUtils.close(writer, dir); + } + + static void doUpdate(Term doc, IndexWriter writer, Field... fields) throws IOException { + long seqId = -1; + do { // retry if we just committing a merge + try (DirectoryReader reader = writer.getReader()) { + TopDocs topDocs = new IndexSearcher(reader).search(new TermQuery(doc), 10); + assertEquals(1, topDocs.totalHits); + int theDoc = topDocs.scoreDocs[0].doc; + seqId = writer.tryUpdateDocValue(reader, theDoc, fields); + } + } while (seqId == -1); + } } +