mirror of https://github.com/apache/lucene.git
LUCENE-7301: ensure multiple doc values updates to one document within one update batch are applied in the correct order
This commit is contained in:
parent
78ed1ca879
commit
7b5d82607a
|
@ -137,6 +137,10 @@ Bug Fixes
|
|||
* LUCENE-7293: Don't try to highlight GeoPoint queries (Britta Weber,
|
||||
Nick Knize, Mike McCandless, Uwe Schindler)
|
||||
|
||||
* LUCENE-7301: Multiple doc values updates to the same document within
|
||||
one update batch could be applied in the wrong order resulting in
|
||||
the wrong updated value (Ishan Chattopadhyaya, hossman, Mike McCandless)
|
||||
|
||||
Documentation
|
||||
|
||||
* LUCENE-7223: Improve XXXPoint javadocs to make it clear that you
|
||||
|
|
|
@ -228,14 +228,19 @@ class BufferedUpdatesStream implements Accountable {
|
|||
assert pool.infoIsLive(info);
|
||||
int delCount = 0;
|
||||
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
|
||||
if (coalescedUpdates != null) {
|
||||
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
|
||||
applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
|
||||
applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
|
||||
}
|
||||
|
||||
// first apply segment-private deletes/updates
|
||||
delCount += applyQueryDeletes(packet.queriesIterable(), segState);
|
||||
applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), segState, dvUpdates);
|
||||
applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), segState, dvUpdates);
|
||||
|
||||
// ... then coalesced deletes/updates, so that if there is an update that appears in both, the coalesced updates (carried from
|
||||
// updates ahead of the segment-privates ones) win:
|
||||
if (coalescedUpdates != null) {
|
||||
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
|
||||
applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
|
||||
applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
|
||||
}
|
||||
if (dvUpdates.any()) {
|
||||
segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
|
||||
}
|
||||
|
@ -261,8 +266,8 @@ class BufferedUpdatesStream implements Accountable {
|
|||
int delCount = 0;
|
||||
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
|
||||
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
|
||||
applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
|
||||
applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
|
||||
applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
|
||||
applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
|
||||
if (dvUpdates.any()) {
|
||||
segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
|
||||
}
|
||||
|
@ -599,8 +604,17 @@ class BufferedUpdatesStream implements Accountable {
|
|||
return delTermVisitedCount;
|
||||
}
|
||||
|
||||
private synchronized void applyDocValuesUpdatesList(List<List<DocValuesUpdate>> updates,
|
||||
SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
|
||||
// we walk backwards through the segments, appending deletion packets to the coalesced updates, so we must apply the packets in reverse
|
||||
// so that newer packets override older ones:
|
||||
for(int idx=updates.size()-1;idx>=0;idx--) {
|
||||
applyDocValuesUpdates(updates.get(idx), segState, dvUpdatesContainer);
|
||||
}
|
||||
}
|
||||
|
||||
// DocValues updates
|
||||
private synchronized void applyDocValuesUpdates(Iterable<? extends DocValuesUpdate> updates,
|
||||
private synchronized void applyDocValuesUpdates(List<DocValuesUpdate> updates,
|
||||
SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
|
||||
Fields fields = segState.reader.fields();
|
||||
|
||||
|
|
|
@ -32,8 +32,8 @@ import org.apache.lucene.util.BytesRef;
|
|||
class CoalescedUpdates {
|
||||
final Map<Query,Integer> queries = new HashMap<>();
|
||||
final List<PrefixCodedTerms> terms = new ArrayList<>();
|
||||
final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
|
||||
final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
|
||||
final List<List<DocValuesUpdate>> numericDVUpdates = new ArrayList<>();
|
||||
final List<List<DocValuesUpdate>> binaryDVUpdates = new ArrayList<>();
|
||||
long totalTermCount;
|
||||
|
||||
@Override
|
||||
|
@ -54,16 +54,20 @@ class CoalescedUpdates {
|
|||
queries.put(query, BufferedUpdates.MAX_INT);
|
||||
}
|
||||
|
||||
List<DocValuesUpdate> numericPacket = new ArrayList<>();
|
||||
numericDVUpdates.add(numericPacket);
|
||||
for (NumericDocValuesUpdate nu : in.numericDVUpdates) {
|
||||
NumericDocValuesUpdate clone = new NumericDocValuesUpdate(nu.term, nu.field, (Long) nu.value);
|
||||
clone.docIDUpto = Integer.MAX_VALUE;
|
||||
numericDVUpdates.add(clone);
|
||||
numericPacket.add(clone);
|
||||
}
|
||||
|
||||
List<DocValuesUpdate> binaryPacket = new ArrayList<>();
|
||||
binaryDVUpdates.add(binaryPacket);
|
||||
for (BinaryDocValuesUpdate bu : in.binaryDVUpdates) {
|
||||
BinaryDocValuesUpdate clone = new BinaryDocValuesUpdate(bu.term, bu.field, (BytesRef) bu.value);
|
||||
clone.docIDUpto = Integer.MAX_VALUE;
|
||||
binaryDVUpdates.add(clone);
|
||||
binaryPacket.add(clone);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ abstract class DocValuesUpdate {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "term=" + term + ",field=" + field + ",value=" + value;
|
||||
return "term=" + term + ",field=" + field + ",value=" + value + ",docIDUpto=" + docIDUpto;
|
||||
}
|
||||
|
||||
/** An in-place update to a binary DocValues field */
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -58,13 +60,119 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
|||
public class TestNumericDocValuesUpdates extends LuceneTestCase {
|
||||
|
||||
private Document doc(int id) {
|
||||
// make sure we don't set the doc's value to 0, to not confuse with a document that's missing values
|
||||
return doc(id, id +1);
|
||||
}
|
||||
|
||||
private Document doc(int id, long val) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "doc-" + id, Store.NO));
|
||||
// make sure we don't set the doc's value to 0, to not confuse with a document that's missing values
|
||||
doc.add(new NumericDocValuesField("val", id + 1));
|
||||
doc.add(new NumericDocValuesField("val", val));
|
||||
return doc;
|
||||
}
|
||||
|
||||
public void testMultipleUpdatesSameDoc() throws Exception {
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||
|
||||
conf.setMaxBufferedDocs(3); // small number of docs, so use a tiny maxBufferedDocs
|
||||
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
writer.updateDocument (new Term("id","doc-1"), doc(1, 1000000000L ));
|
||||
writer.updateNumericDocValue(new Term("id","doc-1"), "val", 1000001111L );
|
||||
writer.updateDocument (new Term("id","doc-2"), doc(2, 2000000000L ));
|
||||
writer.updateDocument (new Term("id","doc-2"), doc(2, 2222222222L ));
|
||||
writer.updateNumericDocValue(new Term("id","doc-1"), "val", 1111111111L );
|
||||
writer.commit();
|
||||
|
||||
final DirectoryReader reader = DirectoryReader.open(dir);
|
||||
final IndexSearcher searcher = new IndexSearcher(reader);
|
||||
TopFieldDocs td;
|
||||
|
||||
td = searcher.search(new TermQuery(new Term("id", "doc-1")), 1,
|
||||
new Sort(new SortField("val", SortField.Type.LONG)));
|
||||
assertEquals("doc-1 missing?", 1, td.scoreDocs.length);
|
||||
assertEquals("doc-1 value", 1111111111L, ((FieldDoc)td.scoreDocs[0]).fields[0]);
|
||||
|
||||
td = searcher.search(new TermQuery(new Term("id", "doc-2")), 1,
|
||||
new Sort(new SortField("val", SortField.Type.LONG)));
|
||||
assertEquals("doc-2 missing?", 1, td.scoreDocs.length);
|
||||
assertEquals("doc-2 value", 2222222222L, ((FieldDoc)td.scoreDocs[0]).fields[0]);
|
||||
|
||||
IOUtils.close(reader, writer, dir);
|
||||
}
|
||||
|
||||
public void testBiasedMixOfRandomUpdates() throws Exception {
|
||||
// 3 types of operations: add, updated, updateDV.
|
||||
// rather then randomizing equally, we'll pick (random) cutoffs so each test run is biased,
|
||||
// in terms of some ops happen more often then others
|
||||
final int ADD_CUTOFF = TestUtil.nextInt(random(), 1, 98);
|
||||
final int UPD_CUTOFF = TestUtil.nextInt(random(), ADD_CUTOFF+1, 99);
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
|
||||
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
final int numOperations = atLeast(1000);
|
||||
final Map<Integer,Long> expected = new HashMap<>(numOperations / 3);
|
||||
|
||||
// start with at least one doc before any chance of updates
|
||||
final int numSeedDocs = atLeast(1);
|
||||
for (int i = 0; i < numSeedDocs; i++) {
|
||||
final long val = random().nextLong();
|
||||
expected.put(i, val);
|
||||
writer.addDocument(doc(i, val));
|
||||
}
|
||||
|
||||
int numDocUpdates = 0;
|
||||
int numValueUpdates = 0;
|
||||
|
||||
//System.out.println("TEST: numOperations=" + numOperations + " ADD_CUTOFF=" + ADD_CUTOFF + " UPD_CUTOFF=" + UPD_CUTOFF);
|
||||
|
||||
for (int i = 0; i < numOperations; i++) {
|
||||
final int op = TestUtil.nextInt(random(), 1, 100);
|
||||
final long val = random().nextLong();
|
||||
if (op <= ADD_CUTOFF) {
|
||||
final int id = expected.size();
|
||||
//System.out.println("TEST i=" + i + ": addDocument id=" + id + " val=" + val);
|
||||
expected.put(id, val);
|
||||
writer.addDocument(doc(id, val));
|
||||
} else {
|
||||
final int id = TestUtil.nextInt(random(), 0, expected.size()-1);
|
||||
expected.put(id, val);
|
||||
if (op <= UPD_CUTOFF) {
|
||||
numDocUpdates++;
|
||||
//System.out.println("TEST i=" + i + ": updateDocument id=" + id + " val=" + val);
|
||||
writer.updateDocument(new Term("id","doc-" + id), doc(id, val));
|
||||
} else {
|
||||
numValueUpdates++;
|
||||
//System.out.println("TEST i=" + i + ": updateDV id=" + id + " val=" + val);
|
||||
writer.updateNumericDocValue(new Term("id","doc-" + id), "val", val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.commit();
|
||||
|
||||
final DirectoryReader reader = DirectoryReader.open(dir);
|
||||
final IndexSearcher searcher = new IndexSearcher(reader);
|
||||
|
||||
// TODO: make more efficient if max numOperations is going to be increased much
|
||||
for (Map.Entry<Integer,Long> expect : expected.entrySet()) {
|
||||
String id = "doc-" + expect.getKey();
|
||||
TopFieldDocs td = searcher.search(new TermQuery(new Term("id", id)), 1,
|
||||
new Sort(new SortField("val", SortField.Type.LONG)));
|
||||
assertEquals(id + " missing?", 1, td.totalHits);
|
||||
assertEquals(id + " value", expect.getValue(), ((FieldDoc)td.scoreDocs[0]).fields[0]);
|
||||
}
|
||||
|
||||
IOUtils.close(reader, writer, dir);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUpdatesAreFlushed() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
|
|
Loading…
Reference in New Issue