mirror of https://github.com/apache/lucene.git
LUCENE-8297: Add IW#tryUpdateDocValues(Reader, int, Fields...)
IndexWriter can update doc values for a specific term but this might affect all documents containing the term. With tryUpdateDocValues users can update doc-values fields for individual documents. This allows for instance to soft-delete individual documents. The new method shares most of it's code with tryDeleteDocuments.
This commit is contained in:
parent
6521d86dde
commit
b0b32931b2
|
@ -155,6 +155,12 @@ New Features
|
|||
* LUCENE-8265: WordDelimter/GraphFilter now have an option to skip tokens
|
||||
marked with KeywordAttribute (Mike Sokolov via Mike McCandless)
|
||||
|
||||
* LUCENE-8297: Add IW#tryUpdateDocValues(Reader, int, Fields...) IndexWriter can
|
||||
update doc values for a specific term but this might affect all documents
|
||||
containing the term. With tryUpdateDocValues users can update doc-values
|
||||
fields for individual documents. This allows for instance to soft-delete
|
||||
individual documents. (Simon Willnauer)
|
||||
|
||||
Bug Fixes
|
||||
|
||||
* LUCENE-8266: Detect bogus tiles when creating a standard polygon and
|
||||
|
|
|
@ -83,7 +83,7 @@ abstract class DocValuesUpdate {
|
|||
|
||||
/** An in-place update to a binary DocValues field */
|
||||
static final class BinaryDocValuesUpdate extends DocValuesUpdate {
|
||||
private final BytesRef value;
|
||||
final BytesRef value;
|
||||
|
||||
/* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */
|
||||
private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*Integer.BYTES + NUM_BYTES_OBJECT_REF;
|
||||
|
@ -132,7 +132,7 @@ abstract class DocValuesUpdate {
|
|||
|
||||
/** An in-place update to a numeric DocValues field */
|
||||
static final class NumericDocValuesUpdate extends DocValuesUpdate {
|
||||
private final long value;
|
||||
final long value;
|
||||
|
||||
NumericDocValuesUpdate(Term term, String field, long value) {
|
||||
this(term, field, value, BufferedUpdates.MAX_INT);
|
||||
|
|
|
@ -1347,7 +1347,82 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
* to delete documents indexed after opening the NRT
|
||||
* reader you must use {@link #deleteDocuments(Term...)}). */
|
||||
public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
|
||||
// NOTE: DON'T use docID inside the closure
|
||||
return tryModifyDocument(readerIn, docID, (leafDocId, rld) -> {
|
||||
if (rld.delete(leafDocId)) {
|
||||
if (isFullyDeleted(rld)) {
|
||||
dropDeletedSegment(rld.info);
|
||||
checkpoint();
|
||||
}
|
||||
|
||||
// Must bump changeCount so if no other changes
|
||||
// happened, we still commit this change:
|
||||
changed();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Expert: attempts to update doc values by document ID, as long as
|
||||
* the provided reader is a near-real-time reader (from {@link
|
||||
* DirectoryReader#open(IndexWriter)}). If the
|
||||
* provided reader is an NRT reader obtained from this
|
||||
* writer, and its segment has not been merged away, then
|
||||
* the update succeeds and this method returns a valid (> 0) sequence
|
||||
* number; else, it returns -1 and the caller must then
|
||||
* either retry the update and resolve the document again.
|
||||
*
|
||||
* <b>NOTE</b>: this method can only updates documents
|
||||
* visible to the currently open NRT reader. If you need
|
||||
* to update documents indexed after opening the NRT
|
||||
* reader you must use {@link #updateDocValues(Term, Field...)}. */
|
||||
public synchronized long tryUpdateDocValue(IndexReader readerIn, int docID, Field... fields) throws IOException {
|
||||
// NOTE: DON'T use docID inside the closure
|
||||
final DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(null, fields);
|
||||
return tryModifyDocument(readerIn, docID, (leafDocId, rld) -> {
|
||||
long nextGen = bufferedUpdatesStream.getNextGen();
|
||||
try {
|
||||
Map<String, DocValuesFieldUpdates> fieldUpdatesMap = new HashMap<>();
|
||||
for (DocValuesUpdate update : dvUpdates) {
|
||||
DocValuesFieldUpdates docValuesFieldUpdates = fieldUpdatesMap.computeIfAbsent(update.field, k -> {
|
||||
switch (update.type) {
|
||||
case NUMERIC:
|
||||
return new NumericDocValuesFieldUpdates(nextGen, k, rld.info.info.maxDoc());
|
||||
case BINARY:
|
||||
return new BinaryDocValuesFieldUpdates(nextGen, k, rld.info.info.maxDoc());
|
||||
default:
|
||||
throw new AssertionError("type: " + update.type + " is not supported");
|
||||
}
|
||||
});
|
||||
switch (update.type) {
|
||||
case NUMERIC:
|
||||
docValuesFieldUpdates.add(leafDocId, ((NumericDocValuesUpdate) update).value);
|
||||
break;
|
||||
case BINARY:
|
||||
docValuesFieldUpdates.add(leafDocId, ((BinaryDocValuesUpdate) update).value);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("type: " + update.type + " is not supported");
|
||||
}
|
||||
}
|
||||
for (DocValuesFieldUpdates updates : fieldUpdatesMap.values()) {
|
||||
updates.finish();
|
||||
rld.addDVUpdate(updates);
|
||||
}
|
||||
} finally {
|
||||
bufferedUpdatesStream.finishedSegment(nextGen);
|
||||
}
|
||||
// Must bump changeCount so if no other changes
|
||||
// happened, we still commit this change:
|
||||
changed();
|
||||
});
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface DocModifier {
|
||||
void run(int docId, ReadersAndUpdates readersAndUpdates) throws IOException;
|
||||
}
|
||||
|
||||
private synchronized long tryModifyDocument(IndexReader readerIn, int docID, DocModifier toApply) throws IOException {
|
||||
final LeafReader reader;
|
||||
if (readerIn instanceof LeafReader) {
|
||||
// Reader is already atomic: use the incoming docID:
|
||||
|
@ -1365,7 +1440,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
if (!(reader instanceof SegmentReader)) {
|
||||
throw new IllegalArgumentException("the reader must be a SegmentReader or composite reader containing only SegmentReaders");
|
||||
}
|
||||
|
||||
|
||||
final SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo();
|
||||
|
||||
// TODO: this is a slow linear search, but, number of
|
||||
|
@ -1377,21 +1452,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
ReadersAndUpdates rld = getPooledInstance(info, false);
|
||||
if (rld != null) {
|
||||
synchronized(bufferedUpdatesStream) {
|
||||
if (rld.delete(docID)) {
|
||||
if (isFullyDeleted(rld)) {
|
||||
dropDeletedSegment(rld.info);
|
||||
checkpoint();
|
||||
}
|
||||
|
||||
// Must bump changeCount so if no other changes
|
||||
// happened, we still commit this change:
|
||||
changed();
|
||||
}
|
||||
toApply.run(docID, rld);
|
||||
return docWriter.deleteQueue.getNextSequenceNumber();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,17 +18,25 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.BinaryDocValuesField;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
|
@ -258,7 +266,6 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
|
|||
writer.close();
|
||||
|
||||
DirectoryReader reader = DirectoryReader.open(dir);
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (LeafReaderContext context : reader.leaves()) {
|
||||
LeafReader r = context.reader();
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
|
@ -305,8 +312,14 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
|
|||
int doc = random().nextInt(numDocs);
|
||||
Term t = new Term("id", "doc" + doc);
|
||||
long value = random().nextLong();
|
||||
writer.updateDocValues(t, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)),
|
||||
new NumericDocValuesField("cf", value*2));
|
||||
if (random().nextBoolean()) {
|
||||
doUpdate(t, writer, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)),
|
||||
new NumericDocValuesField("cf", value*2));
|
||||
} else {
|
||||
writer.updateDocValues(t, new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)),
|
||||
new NumericDocValuesField("cf", value*2));
|
||||
}
|
||||
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
for (LeafReaderContext context : reader.leaves()) {
|
||||
LeafReader r = context.reader();
|
||||
|
@ -394,5 +407,138 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
|
|||
|
||||
dir.close();
|
||||
}
|
||||
|
||||
|
||||
public void testTryUpdateDocValues() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig();
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
int numDocs = 1 + random().nextInt(128);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "" + i, Store.YES));
|
||||
doc.add(new NumericDocValuesField("id", i));
|
||||
doc.add(new BinaryDocValuesField("binaryId", new BytesRef(new byte[] {(byte)i})));
|
||||
writer.addDocument(doc);
|
||||
if (random().nextBoolean()) {
|
||||
writer.flush();
|
||||
}
|
||||
}
|
||||
int doc = random().nextInt(numDocs);
|
||||
doUpdate(new Term("id", "" + doc), writer, new NumericDocValuesField("id", doc + 1),
|
||||
new BinaryDocValuesField("binaryId", new BytesRef(new byte[]{(byte) (doc + 1)})));
|
||||
IndexReader reader = writer.getReader();
|
||||
NumericDocValues idValues = null;
|
||||
BinaryDocValues binaryIdValues = null;
|
||||
for (LeafReaderContext c : reader.leaves()) {
|
||||
TopDocs topDocs = new IndexSearcher(c.reader()).search(new TermQuery(new Term("id", "" + doc)), 10);
|
||||
if (topDocs.totalHits == 1) {
|
||||
assertNull(idValues);
|
||||
assertNull(binaryIdValues);
|
||||
idValues = c.reader().getNumericDocValues("id");
|
||||
assertEquals(topDocs.scoreDocs[0].doc, idValues.advance(topDocs.scoreDocs[0].doc));
|
||||
binaryIdValues = c.reader().getBinaryDocValues("binaryId");
|
||||
assertEquals(topDocs.scoreDocs[0].doc, binaryIdValues.advance(topDocs.scoreDocs[0].doc));
|
||||
} else {
|
||||
assertEquals(0, topDocs.totalHits);
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull(idValues);
|
||||
assertNotNull(binaryIdValues);
|
||||
|
||||
assertEquals(doc+1, idValues.longValue());
|
||||
assertEquals(new BytesRef(new byte[] {(byte)(doc+1)}), binaryIdValues.binaryValue());
|
||||
IOUtils.close(reader, writer, dir);
|
||||
}
|
||||
|
||||
public void testTryUpdateMultiThreaded() throws IOException, BrokenBarrierException, InterruptedException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig();
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
ReentrantLock[] locks = new ReentrantLock[25 + random().nextInt(50)];
|
||||
int[] values = new int[locks.length];
|
||||
for (int i = 0; i < locks.length; i++) {
|
||||
locks[i] = new ReentrantLock();
|
||||
Document doc = new Document();
|
||||
values[i] = random().nextInt();
|
||||
doc.add(new StringField("id", Integer.toString(i), Store.NO));
|
||||
doc.add(new NumericDocValuesField("value", values[i]));
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
Thread[] threads = new Thread[2 + random().nextInt(3)];
|
||||
CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
for (int doc = 0; doc < 1000; doc++) {
|
||||
int docId = random().nextInt(locks.length);
|
||||
locks[docId].lock();
|
||||
try {
|
||||
int value = random().nextInt();
|
||||
if (random().nextBoolean()) {
|
||||
writer.updateDocValues(new Term("id", docId + ""), new NumericDocValuesField("value", value));
|
||||
} else {
|
||||
doUpdate(new Term("id", docId + ""), writer, new NumericDocValuesField("value", value));
|
||||
}
|
||||
values[docId] = value;
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
locks[docId].unlock();
|
||||
}
|
||||
|
||||
if (rarely()) {
|
||||
writer.flush();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
barrier.await();
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
try (DirectoryReader reader = writer.getReader()) {
|
||||
for (int i = 0; i < locks.length; i++) {
|
||||
locks[i].lock();
|
||||
try {
|
||||
int value = values[i];
|
||||
TopDocs topDocs = new IndexSearcher(reader).search(new TermQuery(new Term("id", "" + i)), 10);
|
||||
assertEquals(topDocs.totalHits, 1);
|
||||
int docID = topDocs.scoreDocs[0].doc;
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
int subIndex = ReaderUtil.subIndex(docID, leaves);
|
||||
LeafReader leafReader = leaves.get(subIndex).reader();
|
||||
docID -= leaves.get(subIndex).docBase;
|
||||
NumericDocValues numericDocValues = leafReader.getNumericDocValues("value");
|
||||
assertEquals(docID, numericDocValues.advance(docID));
|
||||
assertEquals(numericDocValues.longValue(), value);
|
||||
} finally {
|
||||
locks[i].unlock();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
IOUtils.close(writer, dir);
|
||||
}
|
||||
|
||||
static void doUpdate(Term doc, IndexWriter writer, Field... fields) throws IOException {
|
||||
long seqId = -1;
|
||||
do { // retry if we just committing a merge
|
||||
try (DirectoryReader reader = writer.getReader()) {
|
||||
TopDocs topDocs = new IndexSearcher(reader).search(new TermQuery(doc), 10);
|
||||
assertEquals(1, topDocs.totalHits);
|
||||
int theDoc = topDocs.scoreDocs[0].doc;
|
||||
seqId = writer.tryUpdateDocValue(reader, theDoc, fields);
|
||||
}
|
||||
} while (seqId == -1);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue