diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java index 3faad496eb7..db9aa091f3a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java @@ -33,7 +33,7 @@ import org.apache.lucene.util.packed.PagedMutable; * * @lucene.experimental */ -class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { +final class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { final static class Iterator extends DocValuesFieldUpdates.Iterator { private final int size; @@ -55,14 +55,14 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { value = values.clone(); this.delGen = delGen; } - + @Override - BytesRef value() { + BytesRef binaryValue() { value.offset = offset; value.length = length; return value; } - + @Override public int nextDoc() { if (idx >= size) { @@ -94,6 +94,11 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { long delGen() { return delGen; } + + @Override + long longValue() { + throw new UnsupportedOperationException(); + } } private PagedMutable docs; @@ -116,9 +121,18 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { return size; } - // NOTE: we fully consume the incoming BytesRef so caller is free to reuse it after we return: @Override - synchronized public void add(int doc, Object value) { + public void add(int doc, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(int docId, DocValuesFieldUpdates.Iterator iterator) { + add(docId, iterator.binaryValue()); + } + + @Override + synchronized public void add(int doc, BytesRef value) { if (finished) { throw new IllegalStateException("already finished"); } @@ -130,8 +144,6 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); } - BytesRef val = (BytesRef) value; - // grow the structures to have room for more elements if (docs.size() == size) { docs = docs.grow(size + 1); @@ -141,8 +153,8 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { docs.set(size, doc); offsets.set(size, values.length()); - lengths.set(size, val.length); - values.append(val); + lengths.set(size, value.length); + values.append(value); ++size; } diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java index a5a86e6774f..ae37f149fbd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntFunction; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; @@ -233,62 +234,49 @@ class BufferedUpdates { } } - public void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) { - LinkedHashMap fieldUpdates = numericUpdates.get(update.field); - if (fieldUpdates == null) { - fieldUpdates = new LinkedHashMap<>(); - numericUpdates.put(update.field, fieldUpdates); - bytesUsed.addAndGet(BYTES_PER_NUMERIC_FIELD_ENTRY); - } - final NumericDocValuesUpdate current = fieldUpdates.get(update.term); - if (current != null && docIDUpto < current.docIDUpto) { - // Only record the new number if it's greater than or equal to the current - // one. This is important because if multiple threads are replacing the - // same doc at nearly the same time, it's possible that one thread that - // got a higher docID is scheduled before the other threads. - return; - } - - update.docIDUpto = docIDUpto; - // since it's a LinkedHashMap, we must first remove the Term entry so that - // it's added last (we're interested in insertion-order). - if (current != null) { - fieldUpdates.remove(update.term); - } - fieldUpdates.put(update.term, update); - numNumericUpdates.incrementAndGet(); - if (current == null) { - bytesUsed.addAndGet(BYTES_PER_NUMERIC_UPDATE_ENTRY + update.sizeInBytes()); + void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) { + if (addDocValuesUpdate(numericUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_NUMERIC_UPDATE_ENTRY, + BYTES_PER_NUMERIC_FIELD_ENTRY)) { + numNumericUpdates.incrementAndGet(); } } - public void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) { - LinkedHashMap fieldUpdates = binaryUpdates.get(update.field); + void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) { + if (addDocValuesUpdate(binaryUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_BINARY_UPDATE_ENTRY, + BYTES_PER_BINARY_FIELD_ENTRY)) { + numBinaryUpdates.incrementAndGet(); + } + } + + private boolean addDocValuesUpdate(Map> updates, T update, + int docIDUpto, IntFunction prepareForApply, + long bytesPerUpdateEntry, long bytesPerFieldEntry) { + LinkedHashMap fieldUpdates = updates.get(update.field); if (fieldUpdates == null) { fieldUpdates = new LinkedHashMap<>(); - binaryUpdates.put(update.field, fieldUpdates); - bytesUsed.addAndGet(BYTES_PER_BINARY_FIELD_ENTRY); + updates.put(update.field, fieldUpdates); + bytesUsed.addAndGet(bytesPerFieldEntry); } - final BinaryDocValuesUpdate current = fieldUpdates.get(update.term); + final T current = fieldUpdates.get(update.term); if (current != null && docIDUpto < current.docIDUpto) { // Only record the new number if it's greater than or equal to the current // one. This is important because if multiple threads are replacing the // same doc at nearly the same time, it's possible that one thread that // got a higher docID is scheduled before the other threads. - return; + return false; } - - update.docIDUpto = docIDUpto; + // since it's a LinkedHashMap, we must first remove the Term entry so that // it's added last (we're interested in insertion-order). if (current != null) { fieldUpdates.remove(update.term); } - fieldUpdates.put(update.term, update); - numBinaryUpdates.incrementAndGet(); + + fieldUpdates.put(update.term, prepareForApply.apply(docIDUpto)); // only make a copy if necessary if (current == null) { - bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes()); + bytesUsed.addAndGet(bytesPerUpdateEntry + update.sizeInBytes()); } + return true; } void clearDeleteTerms() { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java index a711f79f4d4..4a7f5b84f60 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java @@ -16,6 +16,7 @@ */ package org.apache.lucene.index; +import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; @@ -26,7 +27,7 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; * * @lucene.experimental */ -abstract class DocValuesFieldUpdates { +abstract class DocValuesFieldUpdates implements Accountable { protected static final int PAGE_SIZE = 1024; @@ -51,13 +52,18 @@ abstract class DocValuesFieldUpdates { throw new UnsupportedOperationException(); } + @Override public abstract int nextDoc(); // no IOException /** - * Returns the value of the document returned from {@link #nextDoc()}. A - * {@code null} value means that it was unset for this document. + * Returns a long value for the current document if this iterator is a long iterator. */ - abstract Object value(); + abstract long longValue(); + + /** + * Returns a binary value for the current document if this iterator is a binary value iterator. + */ + abstract BytesRef binaryValue(); /** Returns delGen for this packet. */ abstract long delGen(); @@ -73,7 +79,7 @@ abstract class DocValuesFieldUpdates { } @Override public BytesRef binaryValue() { - return (BytesRef) iterator.value(); + return iterator.binaryValue(); } @Override public boolean advanceExact(int target) { @@ -100,7 +106,7 @@ abstract class DocValuesFieldUpdates { return new NumericDocValues() { @Override public long longValue() { - return ((Long)iterator.value()).longValue(); + return iterator.longValue(); } @Override public boolean advanceExact(int target) { @@ -163,14 +169,9 @@ abstract class DocValuesFieldUpdates { } return new Iterator() { - private int doc; - - private boolean first = true; - + private int doc = -1; @Override public int nextDoc() { - // TODO: can we do away with this first boolean? - if (first == false) { // Advance all sub iterators past current doc while (true) { if (queue.size() == 0) { @@ -189,21 +190,22 @@ abstract class DocValuesFieldUpdates { queue.updateTop(); } } - } else { - doc = queue.top().docID(); - first = false; - } return doc; } - + @Override public int docID() { return doc; } @Override - public Object value() { - return queue.top().value(); + long longValue() { + return queue.top().longValue(); + } + + @Override + BytesRef binaryValue() { + return queue.top().binaryValue(); } @Override @@ -229,31 +231,34 @@ abstract class DocValuesFieldUpdates { this.type = type; } - public boolean getFinished() { + boolean getFinished() { return finished; } + abstract void add(int doc, long value); + + abstract void add(int doc, BytesRef value); + /** - * Add an update to a document. For unsetting a value you should pass - * {@code null}. + * Adds the value for the given docID. + * This method prevents conditional calls to {@link Iterator#longValue()} or {@link Iterator#binaryValue()} + * since the implementation knows if it's a long value iterator or binary value */ - public abstract void add(int doc, Object value); - + abstract void add(int docId, Iterator iterator); + /** * Returns an {@link Iterator} over the updated documents and their * values. */ // TODO: also use this for merging, instead of having to write through to disk first - public abstract Iterator iterator(); + abstract Iterator iterator(); /** Freezes internal data structures and sorts updates by docID for efficient iteration. */ - public abstract void finish(); + abstract void finish(); /** Returns true if this instance contains any updates. */ - public abstract boolean any(); + abstract boolean any(); - /** Returns approximate RAM bytes used. */ - public abstract long ramBytesUsed(); + abstract int size(); - public abstract int size(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java index a66f9300850..8229b6094fc 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java @@ -16,11 +16,16 @@ */ package org.apache.lucene.index; +import java.io.IOException; + import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; /** An in-place update to a DocValues field. */ @@ -38,21 +43,22 @@ abstract class DocValuesUpdate { final DocValuesType type; final Term term; final String field; - final Object value; - int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes... + // used in BufferedDeletes to apply this update only to a slice of docs. It's initialized to BufferedUpdates.MAX_INT + // since it's safe and most often used this way we safe object creations. + final int docIDUpto; /** * Constructor. * * @param term the {@link Term} which determines the documents that will be updated * @param field the {@link NumericDocValuesField} to update - * @param value the updated value */ - protected DocValuesUpdate(DocValuesType type, Term term, String field, Object value) { + protected DocValuesUpdate(DocValuesType type, Term term, String field, int docIDUpto) { + assert docIDUpto >= 0 : docIDUpto + "must be >= 0"; this.type = type; this.term = term; this.field = field; - this.value = value; + this.docIDUpto = docIDUpto; } abstract long valueSizeInBytes(); @@ -65,38 +71,102 @@ abstract class DocValuesUpdate { sizeInBytes += valueSizeInBytes(); return sizeInBytes; } + + protected abstract String valueToString(); + + abstract void writeTo(DataOutput output) throws IOException; @Override public String toString() { - return "term=" + term + ",field=" + field + ",value=" + value + ",docIDUpto=" + docIDUpto; + return "term=" + term + ",field=" + field + ",value=" + valueToString() + ",docIDUpto=" + docIDUpto; } /** An in-place update to a binary DocValues field */ static final class BinaryDocValuesUpdate extends DocValuesUpdate { + private final BytesRef value; /* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */ private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*Integer.BYTES + NUM_BYTES_OBJECT_REF; - + BinaryDocValuesUpdate(Term term, String field, BytesRef value) { - super(DocValuesType.BINARY, term, field, value); + this(term, field, value, BufferedUpdates.MAX_INT); + } + + private BinaryDocValuesUpdate(Term term, String field, BytesRef value, int docIDUpTo) { + super(DocValuesType.BINARY, term, field, docIDUpTo); + this.value = value; + } + + BinaryDocValuesUpdate prepareForApply(int docIDUpto) { + if (docIDUpto == this.docIDUpto) { + return this; // it's a final value so we can safely reuse this instance + } + return new BinaryDocValuesUpdate(term, field, value, docIDUpto); } @Override long valueSizeInBytes() { - return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length; + return RAW_VALUE_SIZE_IN_BYTES + value.bytes.length; + } + + @Override + protected String valueToString() { + return value.toString(); + } + + @Override + void writeTo(DataOutput out) throws IOException { + out.writeVInt(value.length); + out.writeBytes(value.bytes, value.offset, value.length); + } + + static BytesRef readFrom(DataInput in, BytesRef scratch) throws IOException { + scratch.length = in.readVInt(); + if (scratch.bytes.length < scratch.length) { + scratch.bytes = ArrayUtil.grow(scratch.bytes, scratch.length); + } + in.readBytes(scratch.bytes, 0, scratch.length); + return scratch; } } /** An in-place update to a numeric DocValues field */ static final class NumericDocValuesUpdate extends DocValuesUpdate { + private final long value; - NumericDocValuesUpdate(Term term, String field, Long value) { - super(DocValuesType.NUMERIC, term, field, value); + NumericDocValuesUpdate(Term term, String field, long value) { + this(term, field, value, BufferedUpdates.MAX_INT); + } + + private NumericDocValuesUpdate(Term term, String field, long value, int docIDUpTo) { + super(DocValuesType.NUMERIC, term, field, docIDUpTo); + this.value = value; + } + + NumericDocValuesUpdate prepareForApply(int docIDUpto) { + if (docIDUpto == this.docIDUpto) { + return this; + } + return new NumericDocValuesUpdate(term, field, value, docIDUpto); } @Override long valueSizeInBytes() { return Long.BYTES; } + + @Override + protected String valueToString() { + return Long.toString(value); + } + + @Override + void writeTo(DataOutput out) throws IOException { + out.writeZLong(value); + } + + static long readFrom(DataInput in) throws IOException { + return in.readZLong(); + } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index ad9c0d1627a..0db043abd03 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -25,7 +25,6 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; import org.apache.lucene.util.Accountable; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; /** @@ -412,10 +411,10 @@ final class DocumentsWriterDeleteQueue implements Accountable { for (DocValuesUpdate update : item) { switch (update.type) { case NUMERIC: - bufferedUpdates.addNumericUpdate(new NumericDocValuesUpdate(update.term, update.field, (Long) update.value), docIDUpto); + bufferedUpdates.addNumericUpdate((NumericDocValuesUpdate) update, docIDUpto); break; case BINARY: - bufferedUpdates.addBinaryUpdate(new BinaryDocValuesUpdate(update.term, update.field, (BytesRef) update.value), docIDUpto); + bufferedUpdates.addBinaryUpdate((BinaryDocValuesUpdate) update, docIDUpto); break; default: throw new IllegalArgumentException(update.type + " DocValues updates not supported yet!"); @@ -436,7 +435,7 @@ final class DocumentsWriterDeleteQueue implements Accountable { if (item.length > 0) { sb.append("term=").append(item[0].term).append("; updates: ["); for (DocValuesUpdate update : item) { - sb.append(update.field).append(':').append(update.value).append(','); + sb.append(update.field).append(':').append(update.valueToString()).append(','); } sb.setCharAt(sb.length()-1, ']'); } diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index bebc05941ad..6e4b6bdb181 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntConsumer; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; @@ -44,6 +45,7 @@ import org.apache.lucene.store.RAMOutputStream; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Counter; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.RamUsageEstimator; @@ -76,8 +78,8 @@ final class FrozenBufferedUpdates { // binary DV update term and their updates final byte[] binaryDVUpdates; - private int numericDVUpdateCount; - private int binaryDVUpdateCount; + private final int numericDVUpdateCount; + private final int binaryDVUpdateCount; /** Counts down once all deletes/updates have been applied */ public final CountDownLatch applied = new CountDownLatch(1); @@ -116,19 +118,22 @@ final class FrozenBufferedUpdates { deleteQueryLimits[upto] = ent.getValue(); upto++; } - + Counter counter = Counter.newCounter(); // TODO if a Term affects multiple fields, we could keep the updates key'd by Term // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. - numericDVUpdates = freezeNumericDVUpdates(updates.numericUpdates); - + numericDVUpdates = freezeDVUpdates(updates.numericUpdates, counter::addAndGet); + numericDVUpdateCount = (int)counter.get(); + counter.addAndGet(-counter.get()); + assert counter.get() == 0; // TODO if a Term affects multiple fields, we could keep the updates key'd by Term // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. - binaryDVUpdates = freezeBinaryDVUpdates(updates.binaryUpdates); - + binaryDVUpdates = freezeDVUpdates(updates.binaryUpdates, counter::addAndGet); + binaryDVUpdateCount = (int)counter.get(); + bytesUsed = (int) (deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY + numericDVUpdates.length + binaryDVUpdates.length); @@ -141,60 +146,17 @@ final class FrozenBufferedUpdates { } } - private byte[] freezeNumericDVUpdates(Map> numericDVUpdates) + private static byte[] freezeDVUpdates(Map> dvUpdates, + IntConsumer updateSizeConsumer) throws IOException { // TODO: we could do better here, e.g. collate the updates by field // so if you are updating 2 fields interleaved we don't keep writing the field strings try (RAMOutputStream out = new RAMOutputStream()) { String lastTermField = null; String lastUpdateField = null; - for (LinkedHashMap numericUpdates : numericDVUpdates.values()) { - numericDVUpdateCount += numericUpdates.size(); - for (NumericDocValuesUpdate update : numericUpdates.values()) { - - int code = update.term.bytes().length << 2; - - String termField = update.term.field(); - if (termField.equals(lastTermField) == false) { - code |= 1; - } - String updateField = update.field; - if (updateField.equals(lastUpdateField) == false) { - code |= 2; - } - out.writeVInt(code); - out.writeVInt(update.docIDUpto); - if ((code & 1) != 0) { - out.writeString(termField); - lastTermField = termField; - } - if ((code & 2) != 0) { - out.writeString(updateField); - lastUpdateField = updateField; - } - - out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); - out.writeZLong(((Long) update.value).longValue()); - } - } - byte[] bytes = new byte[(int) out.getFilePointer()]; - out.writeTo(bytes, 0); - return bytes; - } - } - - private byte[] freezeBinaryDVUpdates(Map> binaryDVUpdates) - throws IOException { - // TODO: we could do better here, e.g. collate the updates by field - // so if you are updating 2 fields interleaved we don't keep writing the field strings - - try (RAMOutputStream out = new RAMOutputStream()) { - String lastTermField = null; - String lastUpdateField = null; - for (LinkedHashMap binaryUpdates : binaryDVUpdates.values()) { - binaryDVUpdateCount += binaryUpdates.size(); - for (BinaryDocValuesUpdate update : binaryUpdates.values()) { - + for (LinkedHashMap updates : dvUpdates.values()) { + updateSizeConsumer.accept(updates.size()); + for (T update : updates.values()) { int code = update.term.bytes().length << 2; String termField = update.term.field(); @@ -216,10 +178,7 @@ final class FrozenBufferedUpdates { lastUpdateField = updateField; } out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length); - - BytesRef value = (BytesRef) update.value; - out.writeVInt(value.length); - out.writeBytes(value.bytes, value.offset, value.length); + update.writeTo(out); } } byte[] bytes = new byte[(int) out.getFilePointer()]; @@ -521,13 +480,13 @@ final class FrozenBufferedUpdates { // because we will run on the newly merged segment next: continue; } - + final boolean isSegmentPrivateDeletes = privateSegment != null; if (numericDVUpdates.length > 0) { - updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true); + updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true, delGen, isSegmentPrivateDeletes); } if (binaryDVUpdates.length > 0) { - updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false); + updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false, delGen, isSegmentPrivateDeletes); } } @@ -544,8 +503,9 @@ final class FrozenBufferedUpdates { return updateCount; } - private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, - byte[] updates, boolean isNumeric) throws IOException { + private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, byte[] updates, + boolean isNumeric, long delGen, + boolean segmentPrivateDeletes) throws IOException { TermsEnum termsEnum = null; PostingsEnum postingsEnum = null; @@ -592,9 +552,9 @@ final class FrozenBufferedUpdates { } in.readBytes(term.bytes, 0, term.length); - int limit; + final int limit; if (delGen == segState.delGen) { - assert privateSegment != null; + assert segmentPrivateDeletes; limit = docIDUpto; } else { limit = Integer.MAX_VALUE; @@ -622,17 +582,14 @@ final class FrozenBufferedUpdates { } } - // TODO: can we avoid boxing here w/o fully forking this method? - Object value; + final BytesRef binaryValue; + final long longValue; if (isNumeric) { - value = Long.valueOf(in.readZLong()); + longValue = NumericDocValuesUpdate.readFrom(in); + binaryValue = null; } else { - value = scratch; - scratch.length = in.readVInt(); - if (scratch.bytes.length < scratch.length) { - scratch.bytes = ArrayUtil.grow(scratch.bytes, scratch.length); - } - in.readBytes(scratch.bytes, 0, scratch.length); + longValue = -1; + binaryValue = BinaryDocValuesUpdate.readFrom(in, scratch); } if (termsEnum == null) { @@ -641,10 +598,8 @@ final class FrozenBufferedUpdates { } if (termsEnum.seekExact(term)) { - // we don't need term frequencies for this postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); - DocValuesFieldUpdates dvUpdates = holder.get(updateField); if (dvUpdates == null) { if (isNumeric) { @@ -652,38 +607,38 @@ final class FrozenBufferedUpdates { } else { dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); } - holder.put(updateField, dvUpdates); } - - if (segState.rld.sortMap != null && privateSegment != null) { + final IntConsumer docIdConsumer; + final DocValuesFieldUpdates update = dvUpdates; + if (isNumeric) { + docIdConsumer = doc -> update.add(doc, longValue); + } else { + docIdConsumer = doc -> update.add(doc, binaryValue); + } + final Bits acceptDocs = segState.rld.getLiveDocs(); + if (segState.rld.sortMap != null && segmentPrivateDeletes) { // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: int doc; - final Bits acceptDocs = segState.rld.getLiveDocs(); while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - - if (acceptDocs != null && acceptDocs.get(doc) == false) { - continue; - } - - // The limit is in the pre-sorted doc space: - if (segState.rld.sortMap.newToOld(doc) < limit) { - dvUpdates.add(doc, value); - updateCount++; + if (acceptDocs == null || acceptDocs.get(doc)) { + // The limit is in the pre-sorted doc space: + if (segState.rld.sortMap.newToOld(doc) < limit) { + docIdConsumer.accept(doc); + updateCount++; + } } } } else { int doc; - final Bits acceptDocs = segState.rld.getLiveDocs(); while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (doc >= limit) { break; // no more docs that can be updated for this term } - if (acceptDocs != null && acceptDocs.get(doc) == false) { - continue; + if (acceptDocs == null || acceptDocs.get(doc)) { + docIdConsumer.accept(doc); + updateCount++; } - dvUpdates.add(doc, value); - updateCount++; } } } @@ -888,7 +843,7 @@ final class FrozenBufferedUpdates { } } if (deleteQueries.length != 0) { - s += " numDeleteQuerys=" + deleteQueries.length; + s += " numDeleteQueries=" + deleteQueries.length; } if (numericDVUpdates.length > 0) { s += " numNumericDVUpdates=" + numericDVUpdateCount; diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index ccbfe5c3888..b2091462259 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3699,7 +3699,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { int mappedDoc = segDocMap.get(segLeafDocMap.get(doc)); if (mappedDoc != -1) { // not deleted - mappedUpdates.add(mappedDoc, it.value()); + mappedUpdates.add(mappedDoc, it); anyDVUpdates = true; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java index 94a964354a1..724c58c63ed 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java @@ -18,6 +18,7 @@ package org.apache.lucene.index; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InPlaceMergeSorter; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.packed.PackedInts; @@ -31,7 +32,7 @@ import org.apache.lucene.util.packed.PagedMutable; * * @lucene.experimental */ -class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { +final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { // TODO: can't this just be NumericDocValues now? avoid boxing the long value... final static class Iterator extends DocValuesFieldUpdates.Iterator { @@ -40,7 +41,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { private final PagedMutable docs; private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE private int doc = -1; - private Long value = null; + private long value; private final long delGen; Iterator(int size, PagedGrowableWriter values, PagedMutable docs, long delGen) { @@ -50,15 +51,20 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { this.delGen = delGen; } + @Override - Long value() { + long longValue() { return value; } - + + @Override + BytesRef binaryValue() { + throw new UnsupportedOperationException(); + } + @Override public int nextDoc() { if (idx >= size) { - value = null; return doc = DocIdSetIterator.NO_MORE_DOCS; } doc = (int) docs.get(idx); @@ -68,7 +74,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { ++idx; } // idx points to the "next" element - value = Long.valueOf(values.get(idx - 1)); + value = values.get(idx - 1); return doc; } @@ -101,28 +107,36 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { } @Override - public synchronized void add(int doc, Object value) { + void add(int doc, BytesRef value) { + throw new UnsupportedOperationException(); + } + + @Override + void add(int docId, DocValuesFieldUpdates.Iterator iterator) { + add(docId, iterator.longValue()); + } + + synchronized void add(int doc, long value) { if (finished) { throw new IllegalStateException("already finished"); } assert doc < maxDoc; - + // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation if (size == Integer.MAX_VALUE) { throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); } - Long val = (Long) value; - + // grow the structures to have room for more elements if (docs.size() == size) { docs = docs.grow(size + 1); values = values.grow(size + 1); } - + docs.set(size, doc); - values.set(size, val.longValue()); + values.set(size, value); ++size; } @@ -156,13 +170,13 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { // increasing docID order: // NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being // stable and preserving original order so the last update to that docID wins - return Integer.compare((int) docs.get(i), (int) docs.get(j)); + return Long.compare(docs.get(i), docs.get(j)); } }.sort(0, size); } @Override - public Iterator iterator() { + Iterator iterator() { if (finished == false) { throw new IllegalStateException("call finish first"); } @@ -170,7 +184,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { } @Override - public boolean any() { + boolean any() { return size > 0; } diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index d61e8edf475..b31bc497981 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -146,12 +146,7 @@ final class ReadersAndUpdates { if (update.getFinished() == false) { throw new IllegalArgumentException("call finish first"); } - List fieldUpdates = pendingDVUpdates.get(update.field); - if (fieldUpdates == null) { - fieldUpdates = new ArrayList<>(); - pendingDVUpdates.put(update.field, fieldUpdates); - } - + List fieldUpdates = pendingDVUpdates.computeIfAbsent(update.field, key -> new ArrayList<>()); assert assertNoDupGen(fieldUpdates, update); ramBytesUsed.addAndGet(update.ramBytesUsed()); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocValuesFieldUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestDocValuesFieldUpdates.java new file mode 100644 index 00000000000..2066dcdf9f9 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocValuesFieldUpdates.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.LuceneTestCase; + +public class TestDocValuesFieldUpdates extends LuceneTestCase { + + public void testMergeIterator() { + NumericDocValuesFieldUpdates updates1 = new NumericDocValuesFieldUpdates(0, "test", 6); + NumericDocValuesFieldUpdates updates2 = new NumericDocValuesFieldUpdates(1, "test", 6); + NumericDocValuesFieldUpdates updates3 = new NumericDocValuesFieldUpdates(2, "test", 6); + NumericDocValuesFieldUpdates updates4 = new NumericDocValuesFieldUpdates(2, "test", 6); + + updates1.add(0, 1); + updates1.add(4, 0); + updates1.add(1, 4); + updates1.add(2, 5); + updates1.add(4, 9); + assertTrue(updates1.any()); + + updates2.add(0, 18); + updates2.add(1, 7); + updates2.add(2, 19); + updates2.add(5, 24); + assertTrue(updates2.any()); + + updates3.add(2, 42); + assertTrue(updates3.any()); + assertFalse(updates4.any()); + updates1.finish(); + updates2.finish(); + updates3.finish(); + updates4.finish(); + List iterators = Arrays.asList(updates1.iterator(), updates2.iterator(), + updates3.iterator(), updates4.iterator()); + Collections.shuffle(iterators, random()); + DocValuesFieldUpdates.Iterator iterator = DocValuesFieldUpdates + .mergedIterator(iterators.toArray(new DocValuesFieldUpdates.Iterator[0])); + assertEquals(0, iterator.nextDoc()); + assertEquals(18, iterator.longValue()); + assertEquals(1, iterator.nextDoc()); + assertEquals(7, iterator.longValue()); + assertEquals(2, iterator.nextDoc()); + assertEquals(42, iterator.longValue()); + assertEquals(4, iterator.nextDoc()); + assertEquals(9, iterator.longValue()); + assertEquals(5, iterator.nextDoc()); + assertEquals(24, iterator.longValue()); + assertEquals(DocIdSetIterator.NO_MORE_DOCS, iterator.nextDoc()); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java b/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java index 4df6d16e480..9119993ee88 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java @@ -32,6 +32,7 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; @@ -201,7 +202,18 @@ public class TestPendingSoftDeletes extends TestPendingDeletes { private DocValuesFieldUpdates singleUpdate(List docsDeleted, int maxDoc) { return new DocValuesFieldUpdates(maxDoc, 0, "_soft_deletes", DocValuesType.NUMERIC) { @Override - public void add(int doc, Object value) { + public void add(int doc, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(int doc, BytesRef value) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(int docId, Iterator iterator) { + throw new UnsupportedOperationException(); } @Override @@ -216,13 +228,18 @@ public class TestPendingSoftDeletes extends TestPendingDeletes { } @Override - public int docID() { - return doc; + long longValue() { + return 1; } @Override - Object value() { - return 1; + BytesRef binaryValue() { + throw new UnsupportedOperationException(); + } + + @Override + public int docID() { + return doc; } @Override