LUCENE-8200: Allow doc-values to be updated atomically together with a document

Today we can only update a document by deleting all previously indexed
documents for the given term. In some cases like when deletes are not
`final` in the way that documents that are marked as deleted should not
be merged away a `soft-delete` is needed which is possible when doc-values
updatea can be done atomically just like delete and add in updateDocument(s)

This change introduces such a soft update that reuses all code paths from deletes
to update all previously updated documents for a given term instead of marking it
as deleted. This is a spinnoff from LUCENE-8198
This commit is contained in:
Simon Willnauer 2018-03-12 12:27:06 +01:00
parent 9de0ebe797
commit ef291c9767
9 changed files with 506 additions and 128 deletions

View File

@ -94,7 +94,14 @@ Optimizations
to run faster. (Adrien Grand) to run faster. (Adrien Grand)
======================= Lucene 7.4.0 ======================= ======================= Lucene 7.4.0 =======================
(No Changes)
New Features
* LUCENE-8200: Allow doc-values to be updated atomically together
with a document. Doc-Values updates now can be used as a soft-delete
mechanism to all keeping several version of a document or already
deleted documents around for later reuse. See "IW.softUpdateDocument(...)"
for reference. (Simon Willnauer)
======================= Lucene 7.3.0 ======================= ======================= Lucene 7.3.0 =======================

View File

@ -433,7 +433,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer, long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException { final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException, AbortingException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
@ -449,7 +449,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqNo = dwpt.updateDocuments(docs, analyzer, delTerm); seqNo = dwpt.updateDocuments(docs, analyzer, delNode);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -460,7 +460,7 @@ final class DocumentsWriter implements Closeable, Accountable {
// accumulate our separate counter: // accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
} }
final boolean isUpdate = delTerm != null; final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
@ -477,7 +477,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException { final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException, AbortingException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
@ -494,7 +494,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqNo = dwpt.updateDocument(doc, analyzer, delTerm); seqNo = dwpt.updateDocument(doc, analyzer, delNode);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -505,7 +505,7 @@ final class DocumentsWriter implements Closeable, Accountable {
// accumulate our separate counter: // accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
} }
final boolean isUpdate = delTerm != null; final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;

View File

@ -56,7 +56,7 @@ import org.apache.lucene.util.InfoStream;
* <ol> * <ol>
* <li>consumes a document and finishes its processing</li> * <li>consumes a document and finishes its processing</li>
* <li>updates its private {@link DeleteSlice} either by calling * <li>updates its private {@link DeleteSlice} either by calling
* {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the * {@link #updateSlice(DeleteSlice)} or {@link #add(Node, DeleteSlice)} (if the
* document has a delTerm)</li> * document has a delTerm)</li>
* <li>applies all deletes in the slice to its private {@link BufferedUpdates} * <li>applies all deletes in the slice to its private {@link BufferedUpdates}
* and resets it</li> * and resets it</li>
@ -131,13 +131,20 @@ final class DocumentsWriterDeleteQueue implements Accountable {
tryApplyGlobalSlice(); tryApplyGlobalSlice();
return seqNo; return seqNo;
} }
static Node<Term> newNode(Term term) {
return new TermNode(term);
}
static Node<DocValuesUpdate[]> newNode(DocValuesUpdate... updates) {
return new DocValuesUpdatesNode(updates);
}
/** /**
* invariant for document update * invariant for document update
*/ */
long add(Term term, DeleteSlice slice) { long add(Node<?> deleteNode, DeleteSlice slice) {
final TermNode termNode = new TermNode(term); long seqNo = add(deleteNode);
long seqNo = add(termNode);
/* /*
* this is an update request where the term is the updated documents * this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic * delTerm. in that case we need to guarantee that this insert is atomic
@ -148,7 +155,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* will apply this delete next time we update our slice and one of the two * will apply this delete next time we update our slice and one of the two
* competing updates wins! * competing updates wins!
*/ */
slice.sliceTail = termNode; slice.sliceTail = deleteNode;
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add"; assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so? // we can do it just every n times or so?
@ -291,12 +298,20 @@ final class DocumentsWriterDeleteQueue implements Accountable {
sliceHead = sliceTail; sliceHead = sliceTail;
} }
/**
* Returns <code>true</code> iff the given node is identical to the the slices tail,
* otherwise <code>false</code>.
*/
boolean isTail(Node<?> node) {
return sliceTail == node;
}
/** /**
* Returns <code>true</code> iff the given item is identical to the item * Returns <code>true</code> iff the given item is identical to the item
* hold by the slices tail, otherwise <code>false</code>. * hold by the slices tail, otherwise <code>false</code>.
*/ */
boolean isTailItem(Object item) { boolean isTailItem(Object object) {
return sliceTail.item == item; return sliceTail.item == object;
} }
boolean isEmpty() { boolean isEmpty() {
@ -319,7 +334,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
} }
private static class Node<T> { static class Node<T> {
volatile Node<?> next; volatile Node<?> next;
final T item; final T item;
@ -330,6 +345,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
void apply(BufferedUpdates bufferedDeletes, int docIDUpto) { void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
throw new IllegalStateException("sentinel item must never be applied"); throw new IllegalStateException("sentinel item must never be applied");
} }
boolean isDelete() {
return true;
}
} }
private static final class TermNode extends Node<Term> { private static final class TermNode extends Node<Term> {
@ -347,6 +366,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() { public String toString() {
return "del=" + item; return "del=" + item;
} }
} }
private static final class QueryArrayNode extends Node<Query[]> { private static final class QueryArrayNode extends Node<Query[]> {
@ -378,6 +398,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() { public String toString() {
return "dels=" + Arrays.toString(item); return "dels=" + Arrays.toString(item);
} }
} }
private static final class DocValuesUpdatesNode extends Node<DocValuesUpdate[]> { private static final class DocValuesUpdatesNode extends Node<DocValuesUpdate[]> {
@ -402,6 +423,12 @@ final class DocumentsWriterDeleteQueue implements Accountable {
} }
} }
@Override
boolean isDelete() {
return false;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -218,7 +218,7 @@ class DocumentsWriterPerThread {
} }
} }
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start"); testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null; assert deleteQueue != null;
reserveOneDoc(); reserveOneDoc();
@ -226,7 +226,7 @@ class DocumentsWriterPerThread {
docState.analyzer = analyzer; docState.analyzer = analyzer;
docState.docID = numDocsInRAM; docState.docID = numDocsInRAM;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
} }
// Even on exception, the document is still added (but marked // Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point. // deleted), so we don't need to un-reserve at that point.
@ -250,15 +250,15 @@ class DocumentsWriterPerThread {
} }
} }
return finishDocument(delTerm); return finishDocument(deleteNode);
} }
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start"); testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null; assert deleteQueue != null;
docState.analyzer = analyzer; docState.analyzer = analyzer;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
} }
int docCount = 0; int docCount = 0;
boolean allDocsIndexed = false; boolean allDocsIndexed = false;
@ -296,9 +296,9 @@ class DocumentsWriterPerThread {
// succeeded, but apply it only to docs prior to when // succeeded, but apply it only to docs prior to when
// this batch started: // this batch started:
long seqNo; long seqNo;
if (delTerm != null) { if (deleteNode != null) {
seqNo = deleteQueue.add(delTerm, deleteSlice); seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo; return seqNo;
} else { } else {
@ -328,7 +328,7 @@ class DocumentsWriterPerThread {
} }
} }
private long finishDocument(Term delTerm) { private long finishDocument(DocumentsWriterDeleteQueue.Node<?> deleteNode) {
/* /*
* here we actually finish the document in two steps 1. push the delete into * here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document * the queue and update our slice. 2. increment the DWPT private document
@ -339,9 +339,9 @@ class DocumentsWriterPerThread {
*/ */
boolean applySlice = numDocsInRAM != 0; boolean applySlice = numDocsInRAM != 0;
long seqNo; long seqNo;
if (delTerm != null) { if (deleteNode != null) {
seqNo = deleteQueue.add(delTerm, deleteSlice); seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
} else { } else {
seqNo = deleteQueue.updateSlice(deleteSlice); seqNo = deleteQueue.updateSlice(deleteSlice);

View File

@ -141,42 +141,42 @@ class FrozenBufferedUpdates {
throws IOException { throws IOException {
// TODO: we could do better here, e.g. collate the updates by field // TODO: we could do better here, e.g. collate the updates by field
// so if you are updating 2 fields interleaved we don't keep writing the field strings // so if you are updating 2 fields interleaved we don't keep writing the field strings
try (RAMOutputStream out = new RAMOutputStream()) {
String lastTermField = null;
String lastUpdateField = null;
for (LinkedHashMap<Term, NumericDocValuesUpdate> numericUpdates : numericDVUpdates.values()) {
numericDVUpdateCount += numericUpdates.size();
for (NumericDocValuesUpdate update : numericUpdates.values()) {
RAMOutputStream out = new RAMOutputStream(); int code = update.term.bytes().length << 2;
String lastTermField = null;
String lastUpdateField = null;
for (LinkedHashMap<Term,NumericDocValuesUpdate> numericUpdates : numericDVUpdates.values()) {
numericDVUpdateCount += numericUpdates.size();
for (NumericDocValuesUpdate update : numericUpdates.values()) {
int code = update.term.bytes().length << 2; String termField = update.term.field();
if (termField.equals(lastTermField) == false) {
code |= 1;
}
String updateField = update.field;
if (updateField.equals(lastUpdateField) == false) {
code |= 2;
}
out.writeVInt(code);
out.writeVInt(update.docIDUpto);
if ((code & 1) != 0) {
out.writeString(termField);
lastTermField = termField;
}
if ((code & 2) != 0) {
out.writeString(updateField);
lastUpdateField = updateField;
}
String termField = update.term.field(); out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
if (termField.equals(lastTermField) == false) { out.writeZLong(((Long) update.value).longValue());
code |= 1;
} }
String updateField = update.field;
if (updateField.equals(lastUpdateField) == false) {
code |= 2;
}
out.writeVInt(code);
out.writeVInt(update.docIDUpto);
if ((code & 1) != 0) {
out.writeString(termField);
lastTermField = termField;
}
if ((code & 2) != 0) {
out.writeString(updateField);
lastUpdateField = updateField;
}
out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
out.writeZLong(((Long) update.value).longValue());
} }
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
} }
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
} }
private byte[] freezeBinaryDVUpdates(Map<String,LinkedHashMap<Term,BinaryDocValuesUpdate>> binaryDVUpdates) private byte[] freezeBinaryDVUpdates(Map<String,LinkedHashMap<Term,BinaryDocValuesUpdate>> binaryDVUpdates)
@ -184,43 +184,44 @@ class FrozenBufferedUpdates {
// TODO: we could do better here, e.g. collate the updates by field // TODO: we could do better here, e.g. collate the updates by field
// so if you are updating 2 fields interleaved we don't keep writing the field strings // so if you are updating 2 fields interleaved we don't keep writing the field strings
RAMOutputStream out = new RAMOutputStream(); try (RAMOutputStream out = new RAMOutputStream()) {
String lastTermField = null; String lastTermField = null;
String lastUpdateField = null; String lastUpdateField = null;
for (LinkedHashMap<Term,BinaryDocValuesUpdate> binaryUpdates : binaryDVUpdates.values()) { for (LinkedHashMap<Term, BinaryDocValuesUpdate> binaryUpdates : binaryDVUpdates.values()) {
binaryDVUpdateCount += binaryUpdates.size(); binaryDVUpdateCount += binaryUpdates.size();
for (BinaryDocValuesUpdate update : binaryUpdates.values()) { for (BinaryDocValuesUpdate update : binaryUpdates.values()) {
int code = update.term.bytes().length << 2; int code = update.term.bytes().length << 2;
String termField = update.term.field(); String termField = update.term.field();
if (termField.equals(lastTermField) == false) { if (termField.equals(lastTermField) == false) {
code |= 1; code |= 1;
} }
String updateField = update.field; String updateField = update.field;
if (updateField.equals(lastUpdateField) == false) { if (updateField.equals(lastUpdateField) == false) {
code |= 2; code |= 2;
} }
out.writeVInt(code); out.writeVInt(code);
out.writeVInt(update.docIDUpto); out.writeVInt(update.docIDUpto);
if (termField.equals(lastTermField) == false) { if (termField.equals(lastTermField) == false) {
out.writeString(termField); out.writeString(termField);
lastTermField = termField; lastTermField = termField;
} }
if (updateField.equals(lastUpdateField) == false) { if (updateField.equals(lastUpdateField) == false) {
out.writeString(updateField); out.writeString(updateField);
lastUpdateField = updateField; lastUpdateField = updateField;
} }
out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
BytesRef value = (BytesRef) update.value; BytesRef value = (BytesRef) update.value;
out.writeVInt(value.length); out.writeVInt(value.length);
out.writeBytes(value.bytes, value.offset, value.length); out.writeBytes(value.bytes, value.offset, value.length);
}
} }
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
} }
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
} }
/** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null /** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null

View File

@ -1459,7 +1459,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException { public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument(null, doc); return updateDocument((DocumentsWriterDeleteQueue.Node<?>) null, doc);
} }
/** /**
@ -1503,7 +1503,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @lucene.experimental * @lucene.experimental
*/ */
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
return updateDocuments(null, docs); return updateDocuments((DocumentsWriterDeleteQueue.Node<?>) null, docs);
} }
/** /**
@ -1523,11 +1523,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @lucene.experimental * @lucene.experimental
*/ */
public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
return updateDocuments(delTerm == null ? null : DocumentsWriterDeleteQueue.newNode(delTerm), docs);
}
private long updateDocuments(final DocumentsWriterDeleteQueue.Node<?> delNode, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
ensureOpen(); ensureOpen();
try { try {
boolean success = false; boolean success = false;
try { try {
long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm); long seqNo = docWriter.updateDocuments(docs, analyzer, delNode);
if (seqNo < 0) { if (seqNo < 0) {
seqNo = -seqNo; seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
@ -1549,6 +1553,48 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
/**
* Expert:
* Atomically updates documents matching the provided
* term with the given doc-values fields
* and adds a block of documents with sequentially
* assigned document IDs, such that an external reader
* will see all or none of the documents.
*
* One use of this API is to retain older versions of
* documents instead of replacing them. The existing
* documents can be updated to reflect they are no
* longer current while atomically adding new documents
* at the same time.
*
* In contrast to {@link #updateDocuments(Term, Iterable)}
* this method will not delete documents in the index
* matching the given term but instead update them with
* the given doc-values fields which can be used as a
* soft-delete mechanism.
*
* See {@link #addDocuments(Iterable)}
* and {@link #updateDocuments(Term, Iterable)}.
*
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends IndexableField>> docs, Field... softDeletes) throws IOException {
if (term == null) {
throw new IllegalArgumentException("term must not be null");
}
if (softDeletes == null || softDeletes.length == 0) {
throw new IllegalArgumentException("at least one soft delete must be present");
}
return updateDocuments(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes, false)), docs);
}
/** Expert: attempts to delete by document ID, as long as /** Expert: attempts to delete by document ID, as long as
* the provided reader is a near-real-time reader (from {@link * the provided reader is a near-real-time reader (from {@link
* DirectoryReader#open(IndexWriter)}). If the * DirectoryReader#open(IndexWriter)}). If the
@ -1720,11 +1766,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException { public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), doc);
}
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen(); ensureOpen();
try { try {
boolean success = false; boolean success = false;
try { try {
long seqNo = docWriter.updateDocument(doc, analyzer, term); long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
if (seqNo < 0) { if (seqNo < 0) {
seqNo = - seqNo; seqNo = - seqNo;
processEvents(true, false); processEvents(true, false);
@ -1746,6 +1797,50 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
/**
* Expert:
* Updates a document by first updating the document(s)
* containing <code>term</code> with the given doc-values fields
* and then adding the new document. The doc-values update and
* then add are atomic as seen by a reader on the same index
* (flush may happen only after the add).
*
* One use of this API is to retain older versions of
* documents instead of replacing them. The existing
* documents can be updated to reflect they are no
* longer current while atomically adding new documents
* at the same time.
*
* In contrast to {@link #updateDocument(Term, Iterable)}
* this method will not delete documents in the index
* matching the given term but instead update them with
* the given doc-values fields which can be used as a
* soft-delete mechanism.
*
* See {@link #addDocuments(Iterable)}
* and {@link #updateDocuments(Term, Iterable)}.
*
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
* @lucene.experimental
*/
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException {
if (term == null) {
throw new IllegalArgumentException("term must not be null");
}
if (softDeletes == null || softDeletes.length == 0) {
throw new IllegalArgumentException("at least one soft delete must be present");
}
return updateDocument(DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes, false)), doc);
}
/** /**
* Updates a document's {@link NumericDocValues} for <code>field</code> to the * Updates a document's {@link NumericDocValues} for <code>field</code> to the
* given <code>value</code>. You can only update fields that already exist in * given <code>value</code>. You can only update fields that already exist in
@ -1855,6 +1950,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/ */
public long updateDocValues(Term term, Field... updates) throws IOException { public long updateDocValues(Term term, Field... updates) throws IOException {
ensureOpen(); ensureOpen();
DocValuesUpdate[] dvUpdates = buildDocValuesUpdate(term, updates, true);
try {
long seqNo = docWriter.updateDocValues(dvUpdates);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues");
// dead code but javac disagrees:
return -1;
}
}
private DocValuesUpdate[] buildDocValuesUpdate(Term term, Field[] updates, boolean enforceFieldExistence) {
DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length]; DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length];
for (int i = 0; i < updates.length; i++) { for (int i = 0; i < updates.length; i++) {
final Field f = updates[i]; final Field f = updates[i];
@ -1865,7 +1977,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (dvType == DocValuesType.NONE) { if (dvType == DocValuesType.NONE) {
throw new IllegalArgumentException("can only update NUMERIC or BINARY fields! field=" + f.name()); throw new IllegalArgumentException("can only update NUMERIC or BINARY fields! field=" + f.name());
} }
if (!globalFieldNumberMap.contains(f.name(), dvType)) { if (enforceFieldExistence && !globalFieldNumberMap.contains(f.name(), dvType)) {
throw new IllegalArgumentException("can only update existing docvalues fields! field=" + f.name() + ", type=" + dvType); throw new IllegalArgumentException("can only update existing docvalues fields! field=" + f.name() + ", type=" + dvType);
} }
if (config.getIndexSortFields().contains(f.name())) { if (config.getIndexSortFields().contains(f.name())) {
@ -1882,21 +1994,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalArgumentException("can only update NUMERIC or BINARY fields: field=" + f.name() + ", type=" + dvType); throw new IllegalArgumentException("can only update NUMERIC or BINARY fields: field=" + f.name() + ", type=" + dvType);
} }
} }
try { return dvUpdates;
long seqNo = docWriter.updateDocValues(dvUpdates);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues");
// dead code but javac disagrees:
return -1;
}
} }
// for test purpose // for test purpose
final synchronized int getSegmentCount(){ final synchronized int getSegmentCount(){
return segmentInfos.size(); return segmentInfos.size();
@ -3693,18 +3793,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
private static class MergedDeletesAndUpdates {
ReadersAndUpdates mergedDeletesAndUpdates = null;
MergedDeletesAndUpdates() {}
final void init(ReaderPool readerPool, MergePolicy.OneMerge merge) throws IOException {
if (mergedDeletesAndUpdates == null) {
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
}
}
}
/** /**
* Carefully merges deletes and updates for the segments we just merged. This * Carefully merges deletes and updates for the segments we just merged. This
* is tricky because, although merging will clear all deletes (compacts the * is tricky because, although merging will clear all deletes (compacts the

View File

@ -291,11 +291,6 @@ class ReadersAndUpdates {
return liveDocs; return liveDocs;
} }
public synchronized Bits getReadOnlyLiveDocs() {
liveDocsShared = true;
return liveDocs;
}
public synchronized void dropChanges() { public synchronized void dropChanges() {
assert Thread.holdsLock(writer); assert Thread.holdsLock(writer);
// Discard (don't save) changes when we are dropping // Discard (don't save) changes when we are dropping

View File

@ -236,8 +236,9 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
int i = 0; int i = 0;
while ((i = index.getAndIncrement()) < ids.length) { while ((i = index.getAndIncrement()) < ids.length) {
Term term = new Term("id", ids[i].toString()); Term term = new Term("id", ids[i].toString());
queue.add(term, slice); DocumentsWriterDeleteQueue.Node<Term> termNode = DocumentsWriterDeleteQueue.newNode(term);
assertTrue(slice.isTailItem(term)); queue.add(termNode, slice);
assertTrue(slice.isTail(termNode));
slice.apply(deletes, BufferedUpdates.MAX_INT); slice.apply(deletes, BufferedUpdates.MAX_INT);
} }
} }

View File

@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.StringReader; import java.io.StringReader;
import java.io.UncheckedIOException;
import java.net.URI; import java.net.URI;
import java.nio.file.FileSystem; import java.nio.file.FileSystem;
import java.nio.file.Files; import java.nio.file.Files;
@ -71,6 +72,7 @@ import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.PhraseQuery; import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -86,6 +88,7 @@ import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
@ -2955,5 +2958,261 @@ public class TestIndexWriter extends LuceneTestCase {
} }
} }
} }
private static Bits getSoftDeletesLiveDocs(LeafReader reader, String field) {
try {
NumericDocValues softDelete = reader.getNumericDocValues(field);
if (softDelete != null) {
BitSet bitSet = BitSet.of(softDelete, reader.maxDoc());
Bits inLiveDocs = reader.getLiveDocs() == null ? new Bits.MatchAllBits(reader.maxDoc()) : reader.getLiveDocs();
Bits newliveDocs = new Bits() {
@Override
public boolean get(int index) {
return inLiveDocs.get(index) && bitSet.get(index) == false;
}
@Override
public int length() {
return inLiveDocs.length();
}
};
return newliveDocs;
} else {
return reader.getLiveDocs();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static DirectoryReader wrapSoftDeletes(DirectoryReader reader, String field) throws IOException {
return new FilterDirectoryReader(reader, new FilterDirectoryReader.SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
Bits softDeletesLiveDocs = getSoftDeletesLiveDocs(reader, field);
int numDocs = getNumDocs(reader, softDeletesLiveDocs);
return new FilterLeafReader(reader) {
@Override
public Bits getLiveDocs() {
return softDeletesLiveDocs;
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public int numDocs() {
return numDocs;
}
};
}
}) {
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapSoftDeletes(in, field);
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
};
}
private static int getNumDocs(LeafReader reader, Bits softDeletesLiveDocs) {
int numDocs;
if (softDeletesLiveDocs == reader.getLiveDocs()) {
numDocs = reader.numDocs();
} else {
int tmp = 0;
for (int i = 0; i < softDeletesLiveDocs.length(); i++) {
if (softDeletesLiveDocs.get(i) ) {
tmp++;
}
}
numDocs = tmp;
}
return numDocs;
}
public void testSoftUpdateDocuments() throws IOException {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocument(null, new Document(), new NumericDocValuesField("soft_delete", 1));
});
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocument(new Term("id", "1"), new Document());
});
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocuments(null, Arrays.asList(new Document()), new NumericDocValuesField("soft_delete", 1));
});
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocuments(new Term("id", "1"), Arrays.asList(new Document()));
});
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "1", Field.Store.YES));
writer.addDocument(doc);
doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "2", Field.Store.YES));
Field field = new NumericDocValuesField("soft_delete", 1);
writer.softUpdateDocument(new Term("id", "1"), doc, field);
DirectoryReader reader = wrapSoftDeletes(DirectoryReader.open(writer), "soft_delete");
assertEquals(2, reader.docFreq(new Term("id", "1")));
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
assertEquals("2", document.get("version"));
// update the on-disk version
doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new StringField("version", "3", Field.Store.YES));
field = new NumericDocValuesField("soft_delete", 1);
writer.softUpdateDocument(new Term("id", "1"), doc, field);
DirectoryReader oldReader = reader;
reader = DirectoryReader.openIfChanged(reader, writer);
assertNotSame(reader, oldReader);
oldReader.close();
searcher = new IndexSearcher(reader);
topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10);
assertEquals(1, topDocs.totalHits);
document = reader.document(topDocs.scoreDocs[0].doc);
assertEquals("3", document.get("version"));
// now delete it
writer.updateDocValues(new Term("id", "1"), field);
oldReader = reader;
reader = DirectoryReader.openIfChanged(reader, writer);
assertNotSame(reader, oldReader);
oldReader.close();
searcher = new IndexSearcher(reader);
topDocs = searcher.search(new TermQuery(new Term("id", "1")), 10);
assertEquals(0, topDocs.totalHits);
writer.close();
reader.close();
dir.close();
}
public void testSoftUpdatesConcurrently() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
AtomicBoolean mergeAwaySoftDeletes = new AtomicBoolean(random().nextBoolean());
indexWriterConfig.setMergePolicy(new OneMergeWrappingMergePolicy(indexWriterConfig.getMergePolicy(), towrap ->
new MergePolicy.OneMerge(towrap.segments) {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
if (mergeAwaySoftDeletes.get() == false) {
return towrap.wrapForMerge(reader);
}
Bits softDeletesLiveDocs = getSoftDeletesLiveDocs(reader, "soft_delete");
int numDocs = getNumDocs(reader, softDeletesLiveDocs);
CodecReader wrapped = towrap.wrapForMerge(reader);
return new FilterCodecReader(wrapped) {
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
@Override
public Bits getLiveDocs() {
return softDeletesLiveDocs;
}
@Override
public int numDocs() {
return numDocs;
}
};
}
}
));
IndexWriter writer = new IndexWriter(dir, indexWriterConfig);
Thread[] threads = new Thread[2 + random().nextInt(3)];
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch started = new CountDownLatch(threads.length);
boolean updateSeveralDocs = random().nextBoolean();
Set<String> ids = Collections.synchronizedSet(new HashSet<>());
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
started.countDown();
startLatch.await();
for (int d = 0; d < 100; d++) {
String id = String.valueOf(random().nextInt(10));
if (updateSeveralDocs) {
Document doc = new Document();
doc.add(new StringField("id", id, Field.Store.YES));
writer.softUpdateDocuments(new Term("id", id), Arrays.asList(doc, doc),
new NumericDocValuesField("soft_delete", 1));
} else {
Document doc = new Document();
doc.add(new StringField("id", id, Field.Store.YES));
writer.softUpdateDocument(new Term("id", id), doc,
new NumericDocValuesField("soft_delete", 1));
}
ids.add(id);
}
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
}
});
threads[i].start();
}
started.await();
startLatch.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
DirectoryReader reader = wrapSoftDeletes(DirectoryReader.open(writer), "soft_delete");
IndexSearcher searcher = new IndexSearcher(reader);
for (String id : ids) {
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", id)), 10);
if (updateSeveralDocs) {
assertEquals(2, topDocs.totalHits);
assertEquals(Math.abs(topDocs.scoreDocs[0].doc - topDocs.scoreDocs[1].doc), 1);
} else {
assertEquals(1, topDocs.totalHits);
}
}
mergeAwaySoftDeletes.set(true);
writer.forceMerge(1);
DirectoryReader oldReader = reader;
reader = DirectoryReader.openIfChanged(reader, writer);
assertNotSame(oldReader, reader);
oldReader.close();
for (String id : ids) {
if (updateSeveralDocs) {
assertEquals(2, reader.docFreq(new Term("id", id)));
} else {
assertEquals(1, reader.docFreq(new Term("id", id)));
}
}
IOUtils.close(reader, writer, dir);
}
} }