mirror of https://github.com/apache/lucene.git
Move DWPT private deletes out of FrozenBufferedUpdates (#1431)
This change moves the deletes tracked by FrozenBufferedUpdates that are private to the DWPT and never used in a global context out of FrozenBufferedUpdates.
This commit is contained in:
parent
18af6325ed
commit
47bc18478a
|
@ -17,9 +17,7 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -55,11 +53,6 @@ class BufferedUpdates implements Accountable {
|
|||
OBJ_HEADER + INT. */
|
||||
final static int BYTES_PER_DEL_TERM = 9*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 7*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 10*Integer.BYTES;
|
||||
|
||||
/* Rough logic: del docIDs are List<Integer>. Say list
|
||||
allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
|
||||
+ int */
|
||||
final static int BYTES_PER_DEL_DOCID = 2*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + Integer.BYTES;
|
||||
|
||||
/* Rough logic: HashMap has an array[Entry] w/ varying
|
||||
load factor (say 2 * POINTER). Entry is object w/
|
||||
Query key, Integer val, int hash, Entry next
|
||||
|
@ -71,7 +64,6 @@ class BufferedUpdates implements Accountable {
|
|||
|
||||
final Map<Term,Integer> deleteTerms = new HashMap<>(); // TODO cut this over to FieldUpdatesBuffer
|
||||
final Map<Query,Integer> deleteQueries = new HashMap<>();
|
||||
final List<Integer> deleteDocIDs = new ArrayList<>();
|
||||
|
||||
final Map<String, FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
|
||||
|
||||
|
@ -96,7 +88,7 @@ class BufferedUpdates implements Accountable {
|
|||
public String toString() {
|
||||
if (VERBOSE_DELETES) {
|
||||
return "gen=" + gen + " numTerms=" + numTermDeletes + ", deleteTerms=" + deleteTerms
|
||||
+ ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", fieldUpdates=" + fieldUpdates
|
||||
+ ", deleteQueries=" + deleteQueries + ", fieldUpdates=" + fieldUpdates
|
||||
+ ", bytesUsed=" + bytesUsed;
|
||||
} else {
|
||||
String s = "gen=" + gen;
|
||||
|
@ -106,9 +98,6 @@ class BufferedUpdates implements Accountable {
|
|||
if (deleteQueries.size() != 0) {
|
||||
s += " " + deleteQueries.size() + " deleted queries";
|
||||
}
|
||||
if (deleteDocIDs.size() != 0) {
|
||||
s += " " + deleteDocIDs.size() + " deleted docIDs";
|
||||
}
|
||||
if (numFieldUpdates.get() != 0) {
|
||||
s += " " + numFieldUpdates.get() + " field updates";
|
||||
}
|
||||
|
@ -128,11 +117,6 @@ class BufferedUpdates implements Accountable {
|
|||
}
|
||||
}
|
||||
|
||||
public void addDocID(int docID) {
|
||||
deleteDocIDs.add(Integer.valueOf(docID));
|
||||
bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
|
||||
}
|
||||
|
||||
public void addTerm(Term term, int docIDUpto) {
|
||||
Integer current = deleteTerms.get(term);
|
||||
if (current != null && docIDUpto < current) {
|
||||
|
@ -185,7 +169,6 @@ class BufferedUpdates implements Accountable {
|
|||
void clear() {
|
||||
deleteTerms.clear();
|
||||
deleteQueries.clear();
|
||||
deleteDocIDs.clear();
|
||||
numTermDeletes.set(0);
|
||||
numFieldUpdates.set(0);
|
||||
fieldUpdates.clear();
|
||||
|
@ -195,16 +178,11 @@ class BufferedUpdates implements Accountable {
|
|||
}
|
||||
|
||||
boolean any() {
|
||||
return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numFieldUpdates.get() > 0;
|
||||
return deleteTerms.size() > 0 || deleteQueries.size() > 0 || numFieldUpdates.get() > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return bytesUsed.get() + fieldUpdatesBytesUsed.get() + termsBytesUsed.get();
|
||||
}
|
||||
|
||||
void clearDeletedDocIds() {
|
||||
bytesUsed.addAndGet(-deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
|
||||
deleteDocIDs.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.store.FlushInfo;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.ByteBlockPool.Allocator;
|
||||
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
|
||||
|
@ -175,6 +176,9 @@ final class DocumentsWriterPerThread {
|
|||
private final boolean enableTestPoints;
|
||||
private final int indexVersionCreated;
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private int[] deleteDocIDs = new int[0];
|
||||
private int numDeletedDocIds = 0;
|
||||
|
||||
|
||||
public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
|
||||
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
|
||||
|
@ -202,7 +206,7 @@ final class DocumentsWriterPerThread {
|
|||
}
|
||||
this.enableTestPoints = enableTestPoints;
|
||||
this.indexVersionCreated = indexVersionCreated;
|
||||
// this should be the last call in the ctor
|
||||
// this should be the last call in the ctor
|
||||
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
|
||||
consumer = indexWriterConfig.getIndexingChain().getChain(this);
|
||||
}
|
||||
|
@ -313,9 +317,14 @@ final class DocumentsWriterPerThread {
|
|||
// we only mark these docs as deleted and turn it into a livedocs
|
||||
// during flush
|
||||
private void deleteLastDocs(int docCount) {
|
||||
for (int docId = numDocsInRAM - docCount; docId < numDocsInRAM; docId++) {
|
||||
pendingUpdates.addDocID(docId);
|
||||
int from = numDocsInRAM-docCount;
|
||||
int to = numDocsInRAM;
|
||||
int size = deleteDocIDs.length;
|
||||
deleteDocIDs = ArrayUtil.grow(deleteDocIDs, numDeletedDocIds + (to-from));
|
||||
for (int docId = from; docId < to; docId++) {
|
||||
deleteDocIDs[numDeletedDocIds++] = docId;
|
||||
}
|
||||
bytesUsed.addAndGet((deleteDocIDs.length - size) * Integer.SIZE);
|
||||
// NOTE: we do not trigger flush here. This is
|
||||
// potentially a RAM leak, if you have an app that tries
|
||||
// to add docs but every single doc always hits a
|
||||
|
@ -367,14 +376,16 @@ final class DocumentsWriterPerThread {
|
|||
// Apply delete-by-docID now (delete-byDocID only
|
||||
// happens when an exception is hit processing that
|
||||
// doc, eg if analyzer has some problem w/ the text):
|
||||
if (pendingUpdates.deleteDocIDs.size() > 0) {
|
||||
if (numDeletedDocIds > 0) {
|
||||
flushState.liveDocs = new FixedBitSet(numDocsInRAM);
|
||||
flushState.liveDocs.set(0, numDocsInRAM);
|
||||
for(int delDocID : pendingUpdates.deleteDocIDs) {
|
||||
flushState.liveDocs.clear(delDocID);
|
||||
for (int i = 0; i < numDeletedDocIds; i++) {
|
||||
flushState.liveDocs.clear(deleteDocIDs[i]);
|
||||
}
|
||||
flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size();
|
||||
pendingUpdates.clearDeletedDocIds();
|
||||
flushState.delCountOnFlush = numDeletedDocIds;
|
||||
bytesUsed.addAndGet(-(deleteDocIDs.length * Integer.SIZE));
|
||||
deleteDocIDs = null;
|
||||
|
||||
}
|
||||
|
||||
if (aborted) {
|
||||
|
@ -606,7 +617,7 @@ final class DocumentsWriterPerThread {
|
|||
public String toString() {
|
||||
return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates
|
||||
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM="
|
||||
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
|
||||
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + ", " + numDeletedDocIds + " deleted docIds" + "]";
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -87,8 +87,7 @@ final class FrozenBufferedUpdates {
|
|||
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
|
||||
this.infoStream = infoStream;
|
||||
this.privateSegment = privateSegment;
|
||||
assert updates.deleteDocIDs.isEmpty();
|
||||
assert privateSegment == null || updates.deleteTerms.isEmpty() : "segment private packet should only have del queries";
|
||||
assert privateSegment == null || updates.deleteTerms.isEmpty() : "segment private packet should only have del queries";
|
||||
Term termsArray[] = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]);
|
||||
ArrayUtil.timSort(termsArray);
|
||||
PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
|
||||
|
@ -859,5 +858,4 @@ final class FrozenBufferedUpdates {
|
|||
return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,11 +28,6 @@ public class TestBufferedUpdates extends LuceneTestCase {
|
|||
BufferedUpdates bu = new BufferedUpdates("seg1");
|
||||
assertEquals(bu.ramBytesUsed(), 0L);
|
||||
assertFalse(bu.any());
|
||||
int docIds = atLeast(1);
|
||||
for (int i = 0; i < docIds; i++) {
|
||||
bu.addDocID(random().nextInt(100));
|
||||
}
|
||||
|
||||
int queries = atLeast(1);
|
||||
for (int i = 0; i < queries; i++) {
|
||||
final int docIDUpto = random().nextBoolean() ? Integer.MAX_VALUE : random().nextInt();
|
||||
|
@ -51,11 +46,6 @@ public class TestBufferedUpdates extends LuceneTestCase {
|
|||
long totalUsed = bu.ramBytesUsed();
|
||||
assertTrue(totalUsed > 0);
|
||||
|
||||
bu.clearDeletedDocIds();
|
||||
assertTrue("only docIds are cleaned, buffer shouldn't be empty", bu.any());
|
||||
assertTrue("docIds are cleaned, ram in used should decrease", totalUsed > bu.ramBytesUsed());
|
||||
totalUsed = bu.ramBytesUsed();
|
||||
|
||||
bu.clearDeleteTerms();
|
||||
assertTrue("only terms and docIds are cleaned, the queries are still in memory", bu.any());
|
||||
assertTrue("terms are cleaned, ram in used should decrease", totalUsed > bu.ramBytesUsed());
|
||||
|
|
Loading…
Reference in New Issue