This commit is contained in:
Karl Wright 2018-04-30 06:12:46 -04:00
commit 871ffbe10e
12 changed files with 374 additions and 242 deletions

View File

@ -33,7 +33,7 @@ import org.apache.lucene.util.packed.PagedMutable;
*
* @lucene.experimental
*/
class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
final class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
final static class Iterator extends DocValuesFieldUpdates.Iterator {
private final int size;
@ -55,14 +55,14 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
value = values.clone();
this.delGen = delGen;
}
@Override
BytesRef value() {
BytesRef binaryValue() {
value.offset = offset;
value.length = length;
return value;
}
@Override
public int nextDoc() {
if (idx >= size) {
@ -94,6 +94,11 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
long delGen() {
return delGen;
}
@Override
long longValue() {
throw new UnsupportedOperationException();
}
}
private PagedMutable docs;
@ -116,9 +121,18 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
return size;
}
// NOTE: we fully consume the incoming BytesRef so caller is free to reuse it after we return:
@Override
synchronized public void add(int doc, Object value) {
public void add(int doc, long value) {
throw new UnsupportedOperationException();
}
@Override
public void add(int docId, DocValuesFieldUpdates.Iterator iterator) {
add(docId, iterator.binaryValue());
}
@Override
synchronized public void add(int doc, BytesRef value) {
if (finished) {
throw new IllegalStateException("already finished");
}
@ -130,8 +144,6 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
}
BytesRef val = (BytesRef) value;
// grow the structures to have room for more elements
if (docs.size() == size) {
docs = docs.grow(size + 1);
@ -141,8 +153,8 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
docs.set(size, doc);
offsets.set(size, values.length());
lengths.set(size, val.length);
values.append(val);
lengths.set(size, value.length);
values.append(value);
++size;
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
@ -233,62 +234,49 @@ class BufferedUpdates {
}
}
public void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
LinkedHashMap<Term,NumericDocValuesUpdate> fieldUpdates = numericUpdates.get(update.field);
if (fieldUpdates == null) {
fieldUpdates = new LinkedHashMap<>();
numericUpdates.put(update.field, fieldUpdates);
bytesUsed.addAndGet(BYTES_PER_NUMERIC_FIELD_ENTRY);
}
final NumericDocValuesUpdate current = fieldUpdates.get(update.term);
if (current != null && docIDUpto < current.docIDUpto) {
// Only record the new number if it's greater than or equal to the current
// one. This is important because if multiple threads are replacing the
// same doc at nearly the same time, it's possible that one thread that
// got a higher docID is scheduled before the other threads.
return;
}
update.docIDUpto = docIDUpto;
// since it's a LinkedHashMap, we must first remove the Term entry so that
// it's added last (we're interested in insertion-order).
if (current != null) {
fieldUpdates.remove(update.term);
}
fieldUpdates.put(update.term, update);
numNumericUpdates.incrementAndGet();
if (current == null) {
bytesUsed.addAndGet(BYTES_PER_NUMERIC_UPDATE_ENTRY + update.sizeInBytes());
void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
if (addDocValuesUpdate(numericUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_NUMERIC_UPDATE_ENTRY,
BYTES_PER_NUMERIC_FIELD_ENTRY)) {
numNumericUpdates.incrementAndGet();
}
}
public void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
LinkedHashMap<Term,BinaryDocValuesUpdate> fieldUpdates = binaryUpdates.get(update.field);
void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
if (addDocValuesUpdate(binaryUpdates, update, docIDUpto, update::prepareForApply, BYTES_PER_BINARY_UPDATE_ENTRY,
BYTES_PER_BINARY_FIELD_ENTRY)) {
numBinaryUpdates.incrementAndGet();
}
}
private <T extends DocValuesUpdate> boolean addDocValuesUpdate(Map<String,LinkedHashMap<Term,T>> updates, T update,
int docIDUpto, IntFunction<T> prepareForApply,
long bytesPerUpdateEntry, long bytesPerFieldEntry) {
LinkedHashMap<Term,T> fieldUpdates = updates.get(update.field);
if (fieldUpdates == null) {
fieldUpdates = new LinkedHashMap<>();
binaryUpdates.put(update.field, fieldUpdates);
bytesUsed.addAndGet(BYTES_PER_BINARY_FIELD_ENTRY);
updates.put(update.field, fieldUpdates);
bytesUsed.addAndGet(bytesPerFieldEntry);
}
final BinaryDocValuesUpdate current = fieldUpdates.get(update.term);
final T current = fieldUpdates.get(update.term);
if (current != null && docIDUpto < current.docIDUpto) {
// Only record the new number if it's greater than or equal to the current
// one. This is important because if multiple threads are replacing the
// same doc at nearly the same time, it's possible that one thread that
// got a higher docID is scheduled before the other threads.
return;
return false;
}
update.docIDUpto = docIDUpto;
// since it's a LinkedHashMap, we must first remove the Term entry so that
// it's added last (we're interested in insertion-order).
if (current != null) {
fieldUpdates.remove(update.term);
}
fieldUpdates.put(update.term, update);
numBinaryUpdates.incrementAndGet();
fieldUpdates.put(update.term, prepareForApply.apply(docIDUpto)); // only make a copy if necessary
if (current == null) {
bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes());
bytesUsed.addAndGet(bytesPerUpdateEntry + update.sizeInBytes());
}
return true;
}
void clearDeleteTerms() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
@ -26,7 +27,7 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
*
* @lucene.experimental
*/
abstract class DocValuesFieldUpdates {
abstract class DocValuesFieldUpdates implements Accountable {
protected static final int PAGE_SIZE = 1024;
@ -51,13 +52,18 @@ abstract class DocValuesFieldUpdates {
throw new UnsupportedOperationException();
}
@Override
public abstract int nextDoc(); // no IOException
/**
* Returns the value of the document returned from {@link #nextDoc()}. A
* {@code null} value means that it was unset for this document.
* Returns a long value for the current document if this iterator is a long iterator.
*/
abstract Object value();
abstract long longValue();
/**
* Returns a binary value for the current document if this iterator is a binary value iterator.
*/
abstract BytesRef binaryValue();
/** Returns delGen for this packet. */
abstract long delGen();
@ -73,7 +79,7 @@ abstract class DocValuesFieldUpdates {
}
@Override
public BytesRef binaryValue() {
return (BytesRef) iterator.value();
return iterator.binaryValue();
}
@Override
public boolean advanceExact(int target) {
@ -100,7 +106,7 @@ abstract class DocValuesFieldUpdates {
return new NumericDocValues() {
@Override
public long longValue() {
return ((Long)iterator.value()).longValue();
return iterator.longValue();
}
@Override
public boolean advanceExact(int target) {
@ -163,14 +169,9 @@ abstract class DocValuesFieldUpdates {
}
return new Iterator() {
private int doc;
private boolean first = true;
private int doc = -1;
@Override
public int nextDoc() {
// TODO: can we do away with this first boolean?
if (first == false) {
// Advance all sub iterators past current doc
while (true) {
if (queue.size() == 0) {
@ -189,21 +190,22 @@ abstract class DocValuesFieldUpdates {
queue.updateTop();
}
}
} else {
doc = queue.top().docID();
first = false;
}
return doc;
}
@Override
public int docID() {
return doc;
}
@Override
public Object value() {
return queue.top().value();
long longValue() {
return queue.top().longValue();
}
@Override
BytesRef binaryValue() {
return queue.top().binaryValue();
}
@Override
@ -229,31 +231,34 @@ abstract class DocValuesFieldUpdates {
this.type = type;
}
public boolean getFinished() {
boolean getFinished() {
return finished;
}
abstract void add(int doc, long value);
abstract void add(int doc, BytesRef value);
/**
* Add an update to a document. For unsetting a value you should pass
* {@code null}.
* Adds the value for the given docID.
* This method prevents conditional calls to {@link Iterator#longValue()} or {@link Iterator#binaryValue()}
* since the implementation knows if it's a long value iterator or binary value
*/
public abstract void add(int doc, Object value);
abstract void add(int docId, Iterator iterator);
/**
* Returns an {@link Iterator} over the updated documents and their
* values.
*/
// TODO: also use this for merging, instead of having to write through to disk first
public abstract Iterator iterator();
abstract Iterator iterator();
/** Freezes internal data structures and sorts updates by docID for efficient iteration. */
public abstract void finish();
abstract void finish();
/** Returns true if this instance contains any updates. */
public abstract boolean any();
abstract boolean any();
/** Returns approximate RAM bytes used. */
public abstract long ramBytesUsed();
abstract int size();
public abstract int size();
}

View File

@ -16,11 +16,16 @@
*/
package org.apache.lucene.index;
import java.io.IOException;
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
/** An in-place update to a DocValues field. */
@ -38,21 +43,22 @@ abstract class DocValuesUpdate {
final DocValuesType type;
final Term term;
final String field;
final Object value;
int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes...
// used in BufferedDeletes to apply this update only to a slice of docs. It's initialized to BufferedUpdates.MAX_INT
// since it's safe and most often used this way we safe object creations.
final int docIDUpto;
/**
* Constructor.
*
* @param term the {@link Term} which determines the documents that will be updated
* @param field the {@link NumericDocValuesField} to update
* @param value the updated value
*/
protected DocValuesUpdate(DocValuesType type, Term term, String field, Object value) {
protected DocValuesUpdate(DocValuesType type, Term term, String field, int docIDUpto) {
assert docIDUpto >= 0 : docIDUpto + "must be >= 0";
this.type = type;
this.term = term;
this.field = field;
this.value = value;
this.docIDUpto = docIDUpto;
}
abstract long valueSizeInBytes();
@ -65,38 +71,102 @@ abstract class DocValuesUpdate {
sizeInBytes += valueSizeInBytes();
return sizeInBytes;
}
protected abstract String valueToString();
abstract void writeTo(DataOutput output) throws IOException;
@Override
public String toString() {
return "term=" + term + ",field=" + field + ",value=" + value + ",docIDUpto=" + docIDUpto;
return "term=" + term + ",field=" + field + ",value=" + valueToString() + ",docIDUpto=" + docIDUpto;
}
/** An in-place update to a binary DocValues field */
static final class BinaryDocValuesUpdate extends DocValuesUpdate {
private final BytesRef value;
/* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */
private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*Integer.BYTES + NUM_BYTES_OBJECT_REF;
BinaryDocValuesUpdate(Term term, String field, BytesRef value) {
super(DocValuesType.BINARY, term, field, value);
this(term, field, value, BufferedUpdates.MAX_INT);
}
private BinaryDocValuesUpdate(Term term, String field, BytesRef value, int docIDUpTo) {
super(DocValuesType.BINARY, term, field, docIDUpTo);
this.value = value;
}
BinaryDocValuesUpdate prepareForApply(int docIDUpto) {
if (docIDUpto == this.docIDUpto) {
return this; // it's a final value so we can safely reuse this instance
}
return new BinaryDocValuesUpdate(term, field, value, docIDUpto);
}
@Override
long valueSizeInBytes() {
return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length;
return RAW_VALUE_SIZE_IN_BYTES + value.bytes.length;
}
@Override
protected String valueToString() {
return value.toString();
}
@Override
void writeTo(DataOutput out) throws IOException {
out.writeVInt(value.length);
out.writeBytes(value.bytes, value.offset, value.length);
}
static BytesRef readFrom(DataInput in, BytesRef scratch) throws IOException {
scratch.length = in.readVInt();
if (scratch.bytes.length < scratch.length) {
scratch.bytes = ArrayUtil.grow(scratch.bytes, scratch.length);
}
in.readBytes(scratch.bytes, 0, scratch.length);
return scratch;
}
}
/** An in-place update to a numeric DocValues field */
static final class NumericDocValuesUpdate extends DocValuesUpdate {
private final long value;
NumericDocValuesUpdate(Term term, String field, Long value) {
super(DocValuesType.NUMERIC, term, field, value);
NumericDocValuesUpdate(Term term, String field, long value) {
this(term, field, value, BufferedUpdates.MAX_INT);
}
private NumericDocValuesUpdate(Term term, String field, long value, int docIDUpTo) {
super(DocValuesType.NUMERIC, term, field, docIDUpTo);
this.value = value;
}
NumericDocValuesUpdate prepareForApply(int docIDUpto) {
if (docIDUpto == this.docIDUpto) {
return this;
}
return new NumericDocValuesUpdate(term, field, value, docIDUpto);
}
@Override
long valueSizeInBytes() {
return Long.BYTES;
}
@Override
protected String valueToString() {
return Long.toString(value);
}
@Override
void writeTo(DataOutput out) throws IOException {
out.writeZLong(value);
}
static long readFrom(DataInput in) throws IOException {
return in.readZLong();
}
}
}

View File

@ -25,7 +25,6 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
/**
@ -412,10 +411,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
for (DocValuesUpdate update : item) {
switch (update.type) {
case NUMERIC:
bufferedUpdates.addNumericUpdate(new NumericDocValuesUpdate(update.term, update.field, (Long) update.value), docIDUpto);
bufferedUpdates.addNumericUpdate((NumericDocValuesUpdate) update, docIDUpto);
break;
case BINARY:
bufferedUpdates.addBinaryUpdate(new BinaryDocValuesUpdate(update.term, update.field, (BytesRef) update.value), docIDUpto);
bufferedUpdates.addBinaryUpdate((BinaryDocValuesUpdate) update, docIDUpto);
break;
default:
throw new IllegalArgumentException(update.type + " DocValues updates not supported yet!");
@ -436,7 +435,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
if (item.length > 0) {
sb.append("term=").append(item[0].term).append("; updates: [");
for (DocValuesUpdate update : item) {
sb.append(update.field).append(':').append(update.value).append(',');
sb.append(update.field).append(':').append(update.valueToString()).append(',');
}
sb.setCharAt(sb.length()-1, ']');
}

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntConsumer;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
@ -44,6 +45,7 @@ import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
@ -76,8 +78,8 @@ final class FrozenBufferedUpdates {
// binary DV update term and their updates
final byte[] binaryDVUpdates;
private int numericDVUpdateCount;
private int binaryDVUpdateCount;
private final int numericDVUpdateCount;
private final int binaryDVUpdateCount;
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1);
@ -116,19 +118,22 @@ final class FrozenBufferedUpdates {
deleteQueryLimits[upto] = ent.getValue();
upto++;
}
Counter counter = Counter.newCounter();
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
numericDVUpdates = freezeNumericDVUpdates(updates.numericUpdates);
numericDVUpdates = freezeDVUpdates(updates.numericUpdates, counter::addAndGet);
numericDVUpdateCount = (int)counter.get();
counter.addAndGet(-counter.get());
assert counter.get() == 0;
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
binaryDVUpdates = freezeBinaryDVUpdates(updates.binaryUpdates);
binaryDVUpdates = freezeDVUpdates(updates.binaryUpdates, counter::addAndGet);
binaryDVUpdateCount = (int)counter.get();
bytesUsed = (int) (deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY
+ numericDVUpdates.length + binaryDVUpdates.length);
@ -141,60 +146,17 @@ final class FrozenBufferedUpdates {
}
}
private byte[] freezeNumericDVUpdates(Map<String,LinkedHashMap<Term,NumericDocValuesUpdate>> numericDVUpdates)
private static <T extends DocValuesUpdate> byte[] freezeDVUpdates(Map<String,LinkedHashMap<Term, T>> dvUpdates,
IntConsumer updateSizeConsumer)
throws IOException {
// TODO: we could do better here, e.g. collate the updates by field
// so if you are updating 2 fields interleaved we don't keep writing the field strings
try (RAMOutputStream out = new RAMOutputStream()) {
String lastTermField = null;
String lastUpdateField = null;
for (LinkedHashMap<Term, NumericDocValuesUpdate> numericUpdates : numericDVUpdates.values()) {
numericDVUpdateCount += numericUpdates.size();
for (NumericDocValuesUpdate update : numericUpdates.values()) {
int code = update.term.bytes().length << 2;
String termField = update.term.field();
if (termField.equals(lastTermField) == false) {
code |= 1;
}
String updateField = update.field;
if (updateField.equals(lastUpdateField) == false) {
code |= 2;
}
out.writeVInt(code);
out.writeVInt(update.docIDUpto);
if ((code & 1) != 0) {
out.writeString(termField);
lastTermField = termField;
}
if ((code & 2) != 0) {
out.writeString(updateField);
lastUpdateField = updateField;
}
out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
out.writeZLong(((Long) update.value).longValue());
}
}
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);
return bytes;
}
}
private byte[] freezeBinaryDVUpdates(Map<String,LinkedHashMap<Term,BinaryDocValuesUpdate>> binaryDVUpdates)
throws IOException {
// TODO: we could do better here, e.g. collate the updates by field
// so if you are updating 2 fields interleaved we don't keep writing the field strings
try (RAMOutputStream out = new RAMOutputStream()) {
String lastTermField = null;
String lastUpdateField = null;
for (LinkedHashMap<Term, BinaryDocValuesUpdate> binaryUpdates : binaryDVUpdates.values()) {
binaryDVUpdateCount += binaryUpdates.size();
for (BinaryDocValuesUpdate update : binaryUpdates.values()) {
for (LinkedHashMap<Term, T> updates : dvUpdates.values()) {
updateSizeConsumer.accept(updates.size());
for (T update : updates.values()) {
int code = update.term.bytes().length << 2;
String termField = update.term.field();
@ -216,10 +178,7 @@ final class FrozenBufferedUpdates {
lastUpdateField = updateField;
}
out.writeBytes(update.term.bytes().bytes, update.term.bytes().offset, update.term.bytes().length);
BytesRef value = (BytesRef) update.value;
out.writeVInt(value.length);
out.writeBytes(value.bytes, value.offset, value.length);
update.writeTo(out);
}
}
byte[] bytes = new byte[(int) out.getFilePointer()];
@ -521,13 +480,13 @@ final class FrozenBufferedUpdates {
// because we will run on the newly merged segment next:
continue;
}
final boolean isSegmentPrivateDeletes = privateSegment != null;
if (numericDVUpdates.length > 0) {
updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true);
updateCount += applyDocValuesUpdates(segState, numericDVUpdates, true, delGen, isSegmentPrivateDeletes);
}
if (binaryDVUpdates.length > 0) {
updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false);
updateCount += applyDocValuesUpdates(segState, binaryDVUpdates, false, delGen, isSegmentPrivateDeletes);
}
}
@ -544,8 +503,9 @@ final class FrozenBufferedUpdates {
return updateCount;
}
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
byte[] updates, boolean isNumeric) throws IOException {
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, byte[] updates,
boolean isNumeric, long delGen,
boolean segmentPrivateDeletes) throws IOException {
TermsEnum termsEnum = null;
PostingsEnum postingsEnum = null;
@ -592,9 +552,9 @@ final class FrozenBufferedUpdates {
}
in.readBytes(term.bytes, 0, term.length);
int limit;
final int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
assert segmentPrivateDeletes;
limit = docIDUpto;
} else {
limit = Integer.MAX_VALUE;
@ -622,17 +582,14 @@ final class FrozenBufferedUpdates {
}
}
// TODO: can we avoid boxing here w/o fully forking this method?
Object value;
final BytesRef binaryValue;
final long longValue;
if (isNumeric) {
value = Long.valueOf(in.readZLong());
longValue = NumericDocValuesUpdate.readFrom(in);
binaryValue = null;
} else {
value = scratch;
scratch.length = in.readVInt();
if (scratch.bytes.length < scratch.length) {
scratch.bytes = ArrayUtil.grow(scratch.bytes, scratch.length);
}
in.readBytes(scratch.bytes, 0, scratch.length);
longValue = -1;
binaryValue = BinaryDocValuesUpdate.readFrom(in, scratch);
}
if (termsEnum == null) {
@ -641,10 +598,8 @@ final class FrozenBufferedUpdates {
}
if (termsEnum.seekExact(term)) {
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
DocValuesFieldUpdates dvUpdates = holder.get(updateField);
if (dvUpdates == null) {
if (isNumeric) {
@ -652,38 +607,38 @@ final class FrozenBufferedUpdates {
} else {
dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
}
holder.put(updateField, dvUpdates);
}
if (segState.rld.sortMap != null && privateSegment != null) {
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
} else {
docIdConsumer = doc -> update.add(doc, binaryValue);
}
final Bits acceptDocs = segState.rld.getLiveDocs();
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
final Bits acceptDocs = segState.rld.getLiveDocs();
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs != null && acceptDocs.get(doc) == false) {
continue;
}
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
dvUpdates.add(doc, value);
updateCount++;
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
} else {
int doc;
final Bits acceptDocs = segState.rld.getLiveDocs();
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
if (acceptDocs != null && acceptDocs.get(doc) == false) {
continue;
if (acceptDocs == null || acceptDocs.get(doc)) {
docIdConsumer.accept(doc);
updateCount++;
}
dvUpdates.add(doc, value);
updateCount++;
}
}
}
@ -888,7 +843,7 @@ final class FrozenBufferedUpdates {
}
}
if (deleteQueries.length != 0) {
s += " numDeleteQuerys=" + deleteQueries.length;
s += " numDeleteQueries=" + deleteQueries.length;
}
if (numericDVUpdates.length > 0) {
s += " numNumericDVUpdates=" + numericDVUpdateCount;

View File

@ -3699,7 +3699,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
int mappedDoc = segDocMap.get(segLeafDocMap.get(doc));
if (mappedDoc != -1) {
// not deleted
mappedUpdates.add(mappedDoc, it.value());
mappedUpdates.add(mappedDoc, it);
anyDVUpdates = true;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.PackedInts;
@ -31,7 +32,7 @@ import org.apache.lucene.util.packed.PagedMutable;
*
* @lucene.experimental
*/
class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
// TODO: can't this just be NumericDocValues now? avoid boxing the long value...
final static class Iterator extends DocValuesFieldUpdates.Iterator {
@ -40,7 +41,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
private final PagedMutable docs;
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
private int doc = -1;
private Long value = null;
private long value;
private final long delGen;
Iterator(int size, PagedGrowableWriter values, PagedMutable docs, long delGen) {
@ -50,15 +51,20 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
this.delGen = delGen;
}
@Override
Long value() {
long longValue() {
return value;
}
@Override
BytesRef binaryValue() {
throw new UnsupportedOperationException();
}
@Override
public int nextDoc() {
if (idx >= size) {
value = null;
return doc = DocIdSetIterator.NO_MORE_DOCS;
}
doc = (int) docs.get(idx);
@ -68,7 +74,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
++idx;
}
// idx points to the "next" element
value = Long.valueOf(values.get(idx - 1));
value = values.get(idx - 1);
return doc;
}
@ -101,28 +107,36 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
public synchronized void add(int doc, Object value) {
void add(int doc, BytesRef value) {
throw new UnsupportedOperationException();
}
@Override
void add(int docId, DocValuesFieldUpdates.Iterator iterator) {
add(docId, iterator.longValue());
}
synchronized void add(int doc, long value) {
if (finished) {
throw new IllegalStateException("already finished");
}
assert doc < maxDoc;
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
if (size == Integer.MAX_VALUE) {
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
}
Long val = (Long) value;
// grow the structures to have room for more elements
if (docs.size() == size) {
docs = docs.grow(size + 1);
values = values.grow(size + 1);
}
docs.set(size, doc);
values.set(size, val.longValue());
values.set(size, value);
++size;
}
@ -156,13 +170,13 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
// increasing docID order:
// NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being
// stable and preserving original order so the last update to that docID wins
return Integer.compare((int) docs.get(i), (int) docs.get(j));
return Long.compare(docs.get(i), docs.get(j));
}
}.sort(0, size);
}
@Override
public Iterator iterator() {
Iterator iterator() {
if (finished == false) {
throw new IllegalStateException("call finish first");
}
@ -170,7 +184,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
public boolean any() {
boolean any() {
return size > 0;
}

View File

@ -146,12 +146,7 @@ final class ReadersAndUpdates {
if (update.getFinished() == false) {
throw new IllegalArgumentException("call finish first");
}
List<DocValuesFieldUpdates> fieldUpdates = pendingDVUpdates.get(update.field);
if (fieldUpdates == null) {
fieldUpdates = new ArrayList<>();
pendingDVUpdates.put(update.field, fieldUpdates);
}
List<DocValuesFieldUpdates> fieldUpdates = pendingDVUpdates.computeIfAbsent(update.field, key -> new ArrayList<>());
assert assertNoDupGen(fieldUpdates, update);
ramBytesUsed.addAndGet(update.ramBytesUsed());

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.LuceneTestCase;
public class TestDocValuesFieldUpdates extends LuceneTestCase {
public void testMergeIterator() {
NumericDocValuesFieldUpdates updates1 = new NumericDocValuesFieldUpdates(0, "test", 6);
NumericDocValuesFieldUpdates updates2 = new NumericDocValuesFieldUpdates(1, "test", 6);
NumericDocValuesFieldUpdates updates3 = new NumericDocValuesFieldUpdates(2, "test", 6);
NumericDocValuesFieldUpdates updates4 = new NumericDocValuesFieldUpdates(2, "test", 6);
updates1.add(0, 1);
updates1.add(4, 0);
updates1.add(1, 4);
updates1.add(2, 5);
updates1.add(4, 9);
assertTrue(updates1.any());
updates2.add(0, 18);
updates2.add(1, 7);
updates2.add(2, 19);
updates2.add(5, 24);
assertTrue(updates2.any());
updates3.add(2, 42);
assertTrue(updates3.any());
assertFalse(updates4.any());
updates1.finish();
updates2.finish();
updates3.finish();
updates4.finish();
List<DocValuesFieldUpdates.Iterator> iterators = Arrays.asList(updates1.iterator(), updates2.iterator(),
updates3.iterator(), updates4.iterator());
Collections.shuffle(iterators, random());
DocValuesFieldUpdates.Iterator iterator = DocValuesFieldUpdates
.mergedIterator(iterators.toArray(new DocValuesFieldUpdates.Iterator[0]));
assertEquals(0, iterator.nextDoc());
assertEquals(18, iterator.longValue());
assertEquals(1, iterator.nextDoc());
assertEquals(7, iterator.longValue());
assertEquals(2, iterator.nextDoc());
assertEquals(42, iterator.longValue());
assertEquals(4, iterator.nextDoc());
assertEquals(9, iterator.longValue());
assertEquals(5, iterator.nextDoc());
assertEquals(24, iterator.longValue());
assertEquals(DocIdSetIterator.NO_MORE_DOCS, iterator.nextDoc());
}
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
@ -201,7 +202,18 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
private DocValuesFieldUpdates singleUpdate(List<Integer> docsDeleted, int maxDoc) {
return new DocValuesFieldUpdates(maxDoc, 0, "_soft_deletes", DocValuesType.NUMERIC) {
@Override
public void add(int doc, Object value) {
public void add(int doc, long value) {
throw new UnsupportedOperationException();
}
@Override
public void add(int doc, BytesRef value) {
throw new UnsupportedOperationException();
}
@Override
public void add(int docId, Iterator iterator) {
throw new UnsupportedOperationException();
}
@Override
@ -216,13 +228,18 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
}
@Override
public int docID() {
return doc;
long longValue() {
return 1;
}
@Override
Object value() {
return 1;
BytesRef binaryValue() {
throw new UnsupportedOperationException();
}
@Override
public int docID() {
return doc;
}
@Override

View File

@ -44,6 +44,7 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.junit.Test;
@ -191,37 +192,41 @@ public class TestDirectoryTaxonomyWriter extends FacetTestCase {
// DirTaxoWriter lost the INDEX_EPOCH property if it was opened in
// CREATE_OR_APPEND (or commit(userData) called twice), which could lead to
// DirTaxoReader succeeding to refresh().
Directory dir = newDirectory();
DirectoryTaxonomyWriter taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("a"));
TaxonomyReader taxoReader = new DirectoryTaxonomyReader(dir);
try (Directory dir = newDirectory()) {
touchTaxo(taxoWriter, new FacetLabel("b"));
TaxonomyReader newtr = TaxonomyReader.openIfChanged(taxoReader);
taxoReader.close();
taxoReader = newtr;
assertEquals(1, Integer.parseInt(taxoReader.getCommitUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)));
DirectoryTaxonomyWriter taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("a"));
// now recreate the taxonomy, and check that the epoch is preserved after opening DirTW again.
taxoWriter.close();
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("c"));
taxoWriter.close();
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("d"));
taxoWriter.close();
TaxonomyReader taxoReader = new DirectoryTaxonomyReader(dir);
newtr = TaxonomyReader.openIfChanged(taxoReader);
taxoReader.close();
taxoReader = newtr;
assertEquals(2, Integer.parseInt(taxoReader.getCommitUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)));
touchTaxo(taxoWriter, new FacetLabel("b"));
taxoReader.close();
dir.close();
TaxonomyReader newtr = TaxonomyReader.openIfChanged(taxoReader);
taxoReader.close();
taxoReader = newtr;
assertEquals(1, Integer.parseInt(taxoReader.getCommitUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)));
// now recreate the taxonomy, and check that the epoch is preserved after opening DirTW again.
taxoWriter.close();
assumeFalse("if we are on windows and we have pending deletions we can't execute this test",
Constants.WINDOWS && dir.checkPendingDeletions());
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("c"));
taxoWriter.close();
assumeFalse("if we are on windows and we have pending deletions we can't execute this test",
Constants.WINDOWS && dir.checkPendingDeletions());
taxoWriter = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE_OR_APPEND, NO_OP_CACHE);
touchTaxo(taxoWriter, new FacetLabel("d"));
taxoWriter.close();
newtr = TaxonomyReader.openIfChanged(taxoReader);
taxoReader.close();
taxoReader = newtr;
assertEquals(2, Integer.parseInt(taxoReader.getCommitUserData().get(DirectoryTaxonomyWriter.INDEX_EPOCH)));
taxoReader.close();
}
}
@Test