mirror of
https://github.com/apache/lucene.git
synced 2025-03-03 23:09:36 +00:00
Fix visibility on member variables in IndexWriter and friends (#1460)
Today it looks like wild wild west inside IndexWriter and some of it's associated classes. This change makes sure all non-final members have private visibility, methods that are not used outside of IW today are made private unless they have been public. This change also removes some unused or unnecessary members where possible and deleted some dead code from previous refactoring.
This commit is contained in:
parent
ff4363675e
commit
bc4da80776
@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.function.ToLongFunction;
|
import java.util.function.ToLongFunction;
|
||||||
|
|
||||||
import org.apache.lucene.analysis.Analyzer;
|
|
||||||
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
|
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
@ -126,7 +125,6 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||||||
private volatile boolean pendingChangesInCurrentFullFlush;
|
private volatile boolean pendingChangesInCurrentFullFlush;
|
||||||
|
|
||||||
final DocumentsWriterPerThreadPool perThreadPool;
|
final DocumentsWriterPerThreadPool perThreadPool;
|
||||||
final FlushPolicy flushPolicy;
|
|
||||||
final DocumentsWriterFlushControl flushControl;
|
final DocumentsWriterFlushControl flushControl;
|
||||||
|
|
||||||
DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
|
DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
|
||||||
@ -142,7 +140,6 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||||||
directory, config, infoStream, deleteQueue, infos,
|
directory, config, infoStream, deleteQueue, infos,
|
||||||
pendingNumDocs, enableTestPoints);
|
pendingNumDocs, enableTestPoints);
|
||||||
});
|
});
|
||||||
flushPolicy = config.getFlushPolicy();
|
|
||||||
this.pendingNumDocs = pendingNumDocs;
|
this.pendingNumDocs = pendingNumDocs;
|
||||||
flushControl = new DocumentsWriterFlushControl(this, config);
|
flushControl = new DocumentsWriterFlushControl(this, config);
|
||||||
this.flushNotifications = flushNotifications;
|
this.flushNotifications = flushNotifications;
|
||||||
@ -152,7 +149,6 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||||||
return applyDeleteOrUpdate(q -> q.addDelete(queries));
|
return applyDeleteOrUpdate(q -> q.addDelete(queries));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
long deleteTerms(final Term... terms) throws IOException {
|
long deleteTerms(final Term... terms) throws IOException {
|
||||||
return applyDeleteOrUpdate(q -> q.addDelete(terms));
|
return applyDeleteOrUpdate(q -> q.addDelete(terms));
|
||||||
}
|
}
|
||||||
@ -406,7 +402,7 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||||||
return hasEvents;
|
return hasEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
|
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs,
|
||||||
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
|
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
|
||||||
boolean hasEvents = preUpdate();
|
boolean hasEvents = preUpdate();
|
||||||
|
|
||||||
@ -420,7 +416,7 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||||||
ensureOpen();
|
ensureOpen();
|
||||||
final int dwptNumDocs = dwpt.getNumDocsInRAM();
|
final int dwptNumDocs = dwpt.getNumDocsInRAM();
|
||||||
try {
|
try {
|
||||||
seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications);
|
seqNo = dwpt.updateDocuments(docs, delNode, flushNotifications);
|
||||||
} finally {
|
} finally {
|
||||||
if (dwpt.isAborted()) {
|
if (dwpt.isAborted()) {
|
||||||
flushControl.doOnAbort(dwpt);
|
flushControl.doOnAbort(dwpt);
|
||||||
|
@ -51,7 +51,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
|
|||||||
private volatile long flushBytes = 0;
|
private volatile long flushBytes = 0;
|
||||||
private volatile int numPending = 0;
|
private volatile int numPending = 0;
|
||||||
private int numDocsSinceStalled = 0; // only with assert
|
private int numDocsSinceStalled = 0; // only with assert
|
||||||
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
|
private final AtomicBoolean flushDeletes = new AtomicBoolean(false);
|
||||||
private boolean fullFlush = false;
|
private boolean fullFlush = false;
|
||||||
private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool
|
private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool
|
||||||
// The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in
|
// The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in
|
||||||
@ -66,13 +66,13 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
|
|||||||
// polling the flushQueue
|
// polling the flushQueue
|
||||||
private final List<DocumentsWriterPerThread> flushingWriters = new ArrayList<>();
|
private final List<DocumentsWriterPerThread> flushingWriters = new ArrayList<>();
|
||||||
|
|
||||||
double maxConfiguredRamBuffer = 0;
|
private double maxConfiguredRamBuffer = 0;
|
||||||
long peakActiveBytes = 0;// only with assert
|
private long peakActiveBytes = 0;// only with assert
|
||||||
long peakFlushBytes = 0;// only with assert
|
private long peakFlushBytes = 0;// only with assert
|
||||||
long peakNetBytes = 0;// only with assert
|
private long peakNetBytes = 0;// only with assert
|
||||||
long peakDelta = 0; // only with assert
|
private long peakDelta = 0; // only with assert
|
||||||
boolean flushByRAMWasDisabled; // only with assert
|
private boolean flushByRAMWasDisabled; // only with assert
|
||||||
final DocumentsWriterStallControl stallControl;
|
final DocumentsWriterStallControl stallControl = new DocumentsWriterStallControl();
|
||||||
private final DocumentsWriterPerThreadPool perThreadPool;
|
private final DocumentsWriterPerThreadPool perThreadPool;
|
||||||
private final FlushPolicy flushPolicy;
|
private final FlushPolicy flushPolicy;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
@ -82,9 +82,8 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
|
|||||||
|
|
||||||
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
|
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
|
||||||
this.infoStream = config.getInfoStream();
|
this.infoStream = config.getInfoStream();
|
||||||
this.stallControl = new DocumentsWriterStallControl();
|
|
||||||
this.perThreadPool = documentsWriter.perThreadPool;
|
this.perThreadPool = documentsWriter.perThreadPool;
|
||||||
this.flushPolicy = documentsWriter.flushPolicy;
|
this.flushPolicy = config.getFlushPolicy();
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
|
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
|
||||||
this.documentsWriter = documentsWriter;
|
this.documentsWriter = documentsWriter;
|
||||||
@ -712,4 +711,12 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getPeakActiveBytes() {
|
||||||
|
return peakActiveBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getPeakNetBytes() {
|
||||||
|
return peakNetBytes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,22 +87,22 @@ final class DocumentsWriterPerThread {
|
|||||||
|
|
||||||
static class DocState {
|
static class DocState {
|
||||||
final DocumentsWriterPerThread docWriter;
|
final DocumentsWriterPerThread docWriter;
|
||||||
Analyzer analyzer;
|
final Analyzer analyzer;
|
||||||
InfoStream infoStream;
|
InfoStream infoStream;
|
||||||
Similarity similarity;
|
Similarity similarity;
|
||||||
int docID;
|
int docID;
|
||||||
Iterable<? extends IndexableField> doc;
|
Iterable<? extends IndexableField> doc;
|
||||||
|
|
||||||
DocState(DocumentsWriterPerThread docWriter, InfoStream infoStream) {
|
DocState(DocumentsWriterPerThread docWriter, Analyzer analyzer, InfoStream infoStream) {
|
||||||
this.docWriter = docWriter;
|
this.docWriter = docWriter;
|
||||||
this.infoStream = infoStream;
|
this.infoStream = infoStream;
|
||||||
|
this.analyzer = analyzer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
// don't hold onto doc nor analyzer, in case it is
|
// don't hold onto doc nor analyzer, in case it is
|
||||||
// largish:
|
// largish:
|
||||||
doc = null;
|
doc = null;
|
||||||
analyzer = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,14 +150,13 @@ final class DocumentsWriterPerThread {
|
|||||||
private final static boolean INFO_VERBOSE = false;
|
private final static boolean INFO_VERBOSE = false;
|
||||||
final Codec codec;
|
final Codec codec;
|
||||||
final TrackingDirectoryWrapper directory;
|
final TrackingDirectoryWrapper directory;
|
||||||
final Directory directoryOrig;
|
|
||||||
final DocState docState;
|
final DocState docState;
|
||||||
final DocConsumer consumer;
|
private final DocConsumer consumer;
|
||||||
final Counter bytesUsed;
|
final Counter bytesUsed;
|
||||||
|
|
||||||
// Updates for our still-in-RAM (to be flushed next) segment
|
// Updates for our still-in-RAM (to be flushed next) segment
|
||||||
final BufferedUpdates pendingUpdates;
|
private final BufferedUpdates pendingUpdates;
|
||||||
final SegmentInfo segmentInfo; // Current segment we are working on
|
private final SegmentInfo segmentInfo; // Current segment we are working on
|
||||||
private boolean aborted = false; // True if we aborted
|
private boolean aborted = false; // True if we aborted
|
||||||
private SetOnce<Boolean> flushPending = new SetOnce<>();
|
private SetOnce<Boolean> flushPending = new SetOnce<>();
|
||||||
private volatile long lastCommittedBytesUsed;
|
private volatile long lastCommittedBytesUsed;
|
||||||
@ -180,15 +179,14 @@ final class DocumentsWriterPerThread {
|
|||||||
private int numDeletedDocIds = 0;
|
private int numDeletedDocIds = 0;
|
||||||
|
|
||||||
|
|
||||||
public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
|
DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
|
||||||
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
|
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
|
||||||
this.directoryOrig = directoryOrig;
|
|
||||||
this.directory = new TrackingDirectoryWrapper(directory);
|
this.directory = new TrackingDirectoryWrapper(directory);
|
||||||
this.fieldInfos = fieldInfos;
|
this.fieldInfos = fieldInfos;
|
||||||
this.indexWriterConfig = indexWriterConfig;
|
this.indexWriterConfig = indexWriterConfig;
|
||||||
this.infoStream = infoStream;
|
this.infoStream = infoStream;
|
||||||
this.codec = indexWriterConfig.getCodec();
|
this.codec = indexWriterConfig.getCodec();
|
||||||
this.docState = new DocState(this, infoStream);
|
this.docState = new DocState(this, indexWriterConfig.getAnalyzer(), infoStream);
|
||||||
this.docState.similarity = indexWriterConfig.getSimilarity();
|
this.docState.similarity = indexWriterConfig.getSimilarity();
|
||||||
this.pendingNumDocs = pendingNumDocs;
|
this.pendingNumDocs = pendingNumDocs;
|
||||||
bytesUsed = Counter.newCounter();
|
bytesUsed = Counter.newCounter();
|
||||||
@ -211,11 +209,11 @@ final class DocumentsWriterPerThread {
|
|||||||
consumer = indexWriterConfig.getIndexingChain().getChain(this);
|
consumer = indexWriterConfig.getIndexingChain().getChain(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FieldInfos.Builder getFieldInfosBuilder() {
|
FieldInfos.Builder getFieldInfosBuilder() {
|
||||||
return fieldInfos;
|
return fieldInfos;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getIndexCreatedVersionMajor() {
|
int getIndexCreatedVersionMajor() {
|
||||||
return indexVersionCreated;
|
return indexVersionCreated;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,11 +234,10 @@ final class DocumentsWriterPerThread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
|
long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
|
||||||
try {
|
try {
|
||||||
testPoint("DocumentsWriterPerThread addDocuments start");
|
testPoint("DocumentsWriterPerThread addDocuments start");
|
||||||
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
|
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
|
||||||
docState.analyzer = analyzer;
|
|
||||||
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
|
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
|
||||||
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
|
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
|
||||||
}
|
}
|
||||||
@ -481,7 +478,7 @@ final class DocumentsWriterPerThread {
|
|||||||
|
|
||||||
private final Set<String> filesToDelete = new HashSet<>();
|
private final Set<String> filesToDelete = new HashSet<>();
|
||||||
|
|
||||||
public Set<String> pendingFilesToDelete() {
|
Set<String> pendingFilesToDelete() {
|
||||||
return filesToDelete;
|
return filesToDelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,8 +345,8 @@ final class IndexFileDeleter implements Closeable {
|
|||||||
void ensureOpen() throws AlreadyClosedException {
|
void ensureOpen() throws AlreadyClosedException {
|
||||||
writer.ensureOpen(false);
|
writer.ensureOpen(false);
|
||||||
// since we allow 'closing' state, we must still check this, we could be closing because we hit e.g. OOM
|
// since we allow 'closing' state, we must still check this, we could be closing because we hit e.g. OOM
|
||||||
if (writer.tragedy.get() != null) {
|
if (writer.getTragicException() != null) {
|
||||||
throw new AlreadyClosedException("refusing to delete any files: this IndexWriter hit an unrecoverable exception", writer.tragedy.get());
|
throw new AlreadyClosedException("refusing to delete any files: this IndexWriter hit an unrecoverable exception", writer.getTragicException());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.IntPredicate;
|
import java.util.function.IntPredicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
@ -245,7 +246,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
/** Used only for testing. */
|
/** Used only for testing. */
|
||||||
private final boolean enableTestPoints;
|
private final boolean enableTestPoints;
|
||||||
|
|
||||||
static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
|
private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the write lock in the index.
|
* Name of the write lock in the index.
|
||||||
@ -277,20 +278,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
public final static int MAX_STORED_STRING_LENGTH = ArrayUtil.MAX_ARRAY_LENGTH / UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR;
|
public final static int MAX_STORED_STRING_LENGTH = ArrayUtil.MAX_ARRAY_LENGTH / UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR;
|
||||||
|
|
||||||
// when unrecoverable disaster strikes, we populate this with the reason that we had to close IndexWriter
|
// when unrecoverable disaster strikes, we populate this with the reason that we had to close IndexWriter
|
||||||
final AtomicReference<Throwable> tragedy = new AtomicReference<>(null);
|
private final AtomicReference<Throwable> tragedy = new AtomicReference<>(null);
|
||||||
|
|
||||||
private final Directory directoryOrig; // original user directory
|
private final Directory directoryOrig; // original user directory
|
||||||
private final Directory directory; // wrapped with additional checks
|
private final Directory directory; // wrapped with additional checks
|
||||||
private final Analyzer analyzer; // how to analyze text
|
|
||||||
|
|
||||||
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
|
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
|
||||||
private volatile long lastCommitChangeCount; // last changeCount that was committed
|
private volatile long lastCommitChangeCount; // last changeCount that was committed
|
||||||
|
|
||||||
private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails
|
private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails
|
||||||
|
|
||||||
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
|
private volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
|
||||||
volatile long pendingSeqNo;
|
private volatile long pendingSeqNo;
|
||||||
volatile long pendingCommitChangeCount;
|
private volatile long pendingCommitChangeCount;
|
||||||
|
|
||||||
private Collection<String> filesToCommit;
|
private Collection<String> filesToCommit;
|
||||||
|
|
||||||
@ -301,6 +301,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
private final EventQueue eventQueue = new EventQueue(this);
|
private final EventQueue eventQueue = new EventQueue(this);
|
||||||
private final MergeScheduler.MergeSource mergeSource = new IndexWriterMergeSource(this);
|
private final MergeScheduler.MergeSource mergeSource = new IndexWriterMergeSource(this);
|
||||||
|
|
||||||
|
private final ReentrantLock writeDocValuesLock = new ReentrantLock();
|
||||||
|
|
||||||
static final class EventQueue implements Closeable {
|
static final class EventQueue implements Closeable {
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
// we use a semaphore here instead of simply synced methods to allow
|
// we use a semaphore here instead of simply synced methods to allow
|
||||||
@ -374,10 +376,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final IndexFileDeleter deleter;
|
private final IndexFileDeleter deleter;
|
||||||
|
|
||||||
// used by forceMerge to note those needing merging
|
// used by forceMerge to note those needing merging
|
||||||
private Map<SegmentCommitInfo,Boolean> segmentsToMerge = new HashMap<>();
|
private final Map<SegmentCommitInfo,Boolean> segmentsToMerge = new HashMap<>();
|
||||||
private int mergeMaxNumSegments;
|
private int mergeMaxNumSegments;
|
||||||
|
|
||||||
private Lock writeLock;
|
private Lock writeLock;
|
||||||
@ -391,23 +393,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
|
|
||||||
// Holds all SegmentInfo instances currently involved in
|
// Holds all SegmentInfo instances currently involved in
|
||||||
// merges
|
// merges
|
||||||
HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
private final HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
|
||||||
|
|
||||||
private final MergeScheduler mergeScheduler;
|
private final MergeScheduler mergeScheduler;
|
||||||
private Set<SegmentMerger> runningAddIndexesMerges = new HashSet<>();
|
private final Set<SegmentMerger> runningAddIndexesMerges = new HashSet<>();
|
||||||
private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
|
private final LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
|
||||||
private Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
|
private final Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
|
||||||
private List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
|
private final List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
|
||||||
private long mergeGen;
|
private long mergeGen;
|
||||||
private boolean stopMerges; // TODO make sure this is only changed once and never set back to false
|
private boolean stopMerges; // TODO make sure this is only changed once and never set back to false
|
||||||
private boolean didMessageState;
|
private boolean didMessageState;
|
||||||
|
private final AtomicInteger flushCount = new AtomicInteger();
|
||||||
final AtomicInteger flushCount = new AtomicInteger();
|
private final AtomicInteger flushDeletesCount = new AtomicInteger();
|
||||||
|
|
||||||
final AtomicInteger flushDeletesCount = new AtomicInteger();
|
|
||||||
|
|
||||||
private final ReaderPool readerPool;
|
private final ReaderPool readerPool;
|
||||||
final BufferedUpdatesStream bufferedUpdatesStream;
|
private final BufferedUpdatesStream bufferedUpdatesStream;
|
||||||
|
|
||||||
/** Counts how many merges have completed; this is used by {@link #forceApply(FrozenBufferedUpdates)}
|
/** Counts how many merges have completed; this is used by {@link #forceApply(FrozenBufferedUpdates)}
|
||||||
* to handle concurrently apply deletes/updates with merges completing. */
|
* to handle concurrently apply deletes/updates with merges completing. */
|
||||||
@ -426,8 +424,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* the right to add N docs, before they actually change the index,
|
* the right to add N docs, before they actually change the index,
|
||||||
* much like how hotels place an "authorization hold" on your credit
|
* much like how hotels place an "authorization hold" on your credit
|
||||||
* card to make sure they can later charge you when you check out. */
|
* card to make sure they can later charge you when you check out. */
|
||||||
final AtomicLong pendingNumDocs = new AtomicLong();
|
private final AtomicLong pendingNumDocs = new AtomicLong();
|
||||||
final boolean softDeletesEnabled;
|
private final boolean softDeletesEnabled;
|
||||||
|
|
||||||
private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
|
private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
|
||||||
@Override
|
@Override
|
||||||
@ -561,14 +559,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
boolean success = false;
|
boolean success = false;
|
||||||
synchronized (fullFlushLock) {
|
synchronized (fullFlushLock) {
|
||||||
try {
|
try {
|
||||||
// TODO: should we somehow make this available in the returned NRT reader?
|
// TODO: should we somehow make the seqNo available in the returned NRT reader?
|
||||||
long seqNo = docWriter.flushAllThreads();
|
anyChanges = docWriter.flushAllThreads() < 0;
|
||||||
if (seqNo < 0) {
|
|
||||||
anyChanges = true;
|
|
||||||
seqNo = -seqNo;
|
|
||||||
} else {
|
|
||||||
anyChanges = false;
|
|
||||||
}
|
|
||||||
if (anyChanges == false) {
|
if (anyChanges == false) {
|
||||||
// prevent double increment since docWriter#doFlush increments the flushcount
|
// prevent double increment since docWriter#doFlush increments the flushcount
|
||||||
// if we flushed anything.
|
// if we flushed anything.
|
||||||
@ -602,7 +594,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
// Done: finish the full flush!
|
// Done: finish the full flush!
|
||||||
assert holdsFullFlushLock();
|
assert Thread.holdsLock(fullFlushLock);
|
||||||
docWriter.finishFullFlush(success);
|
docWriter.finishFullFlush(success);
|
||||||
if (success) {
|
if (success) {
|
||||||
processEvents(false);
|
processEvents(false);
|
||||||
@ -651,11 +643,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
return docWriter.getFlushingBytes();
|
return docWriter.getFlushingBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private final AtomicBoolean writeDocValuesLock = new AtomicBoolean();
|
|
||||||
|
|
||||||
final void writeSomeDocValuesUpdates() throws IOException {
|
final void writeSomeDocValuesUpdates() throws IOException {
|
||||||
if (writeDocValuesLock.compareAndSet(false, true)) {
|
if (writeDocValuesLock.tryLock()) {
|
||||||
try {
|
try {
|
||||||
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
|
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
|
||||||
// If the reader pool is > 50% of our IW buffer, then write the updates:
|
// If the reader pool is > 50% of our IW buffer, then write the updates:
|
||||||
@ -709,7 +698,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeDocValuesLock.set(false);
|
writeDocValuesLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -763,8 +752,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
ensureOpen(true);
|
ensureOpen(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Codec codec; // for writing new segments
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new IndexWriter per the settings given in <code>conf</code>.
|
* Constructs a new IndexWriter per the settings given in <code>conf</code>.
|
||||||
* If you want to make "live" changes to this writer instance, use
|
* If you want to make "live" changes to this writer instance, use
|
||||||
@ -800,11 +787,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
try {
|
try {
|
||||||
directoryOrig = d;
|
directoryOrig = d;
|
||||||
directory = new LockValidatingDirectoryWrapper(d, writeLock);
|
directory = new LockValidatingDirectoryWrapper(d, writeLock);
|
||||||
|
|
||||||
analyzer = config.getAnalyzer();
|
|
||||||
mergeScheduler = config.getMergeScheduler();
|
mergeScheduler = config.getMergeScheduler();
|
||||||
mergeScheduler.initialize(infoStream, directoryOrig);
|
mergeScheduler.initialize(infoStream, directoryOrig);
|
||||||
codec = config.getCodec();
|
|
||||||
OpenMode mode = config.getOpenMode();
|
OpenMode mode = config.getOpenMode();
|
||||||
final boolean indexExists;
|
final boolean indexExists;
|
||||||
final boolean create;
|
final boolean create;
|
||||||
@ -1196,7 +1180,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
/** Returns the analyzer used by this index. */
|
/** Returns the analyzer used by this index. */
|
||||||
public Analyzer getAnalyzer() {
|
public Analyzer getAnalyzer() {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
return analyzer;
|
return config.getAnalyzer();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** If {@link SegmentInfos#getVersion} is below {@code newVersion} then update it to this value.
|
/** If {@link SegmentInfos#getVersion} is below {@code newVersion} then update it to this value.
|
||||||
@ -1219,13 +1203,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
*/
|
*/
|
||||||
public synchronized boolean hasDeletions() {
|
public synchronized boolean hasDeletions() {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
if (bufferedUpdatesStream.any()) {
|
if (bufferedUpdatesStream.any()
|
||||||
return true;
|
|| docWriter.anyDeletions()
|
||||||
}
|
|| readerPool.anyDeletions()) {
|
||||||
if (docWriter.anyDeletions()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (readerPool.anyDeletions()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
for (final SegmentCommitInfo info : segmentInfos) {
|
for (final SegmentCommitInfo info : segmentInfos) {
|
||||||
@ -1349,7 +1329,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
ensureOpen();
|
ensureOpen();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
final long seqNo = maybeProcessEvents(docWriter.updateDocuments(docs, analyzer, delNode));
|
final long seqNo = maybeProcessEvents(docWriter.updateDocuments(docs, delNode));
|
||||||
success = true;
|
success = true;
|
||||||
return seqNo;
|
return seqNo;
|
||||||
} catch (VirtualMachineError tragedy) {
|
} catch (VirtualMachineError tragedy) {
|
||||||
@ -1363,7 +1343,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
maybeCloseOnTragicEvent();
|
maybeCloseOnTragicEvent();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1853,7 +1832,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
return flushDeletesCount.get();
|
return flushDeletesCount.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String newSegmentName() {
|
private final String newSegmentName() {
|
||||||
// Cannot synchronize on IndexWriter because that causes
|
// Cannot synchronize on IndexWriter because that causes
|
||||||
// deadlock
|
// deadlock
|
||||||
synchronized(segmentInfos) {
|
synchronized(segmentInfos) {
|
||||||
@ -2148,7 +2127,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
|
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
|
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
|
||||||
ensureOpen(false);
|
ensureOpen(false);
|
||||||
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
|
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
|
||||||
mergeScheduler.merge(mergeSource, trigger);
|
mergeScheduler.merge(mergeSource, trigger);
|
||||||
@ -2568,18 +2547,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* new files, and increments changeCount, so on
|
* new files, and increments changeCount, so on
|
||||||
* close/commit we will write a new segments file, but
|
* close/commit we will write a new segments file, but
|
||||||
* does NOT bump segmentInfos.version. */
|
* does NOT bump segmentInfos.version. */
|
||||||
synchronized void checkpointNoSIS() throws IOException {
|
private synchronized void checkpointNoSIS() throws IOException {
|
||||||
changeCount.incrementAndGet();
|
changeCount.incrementAndGet();
|
||||||
deleter.checkpoint(segmentInfos, false);
|
deleter.checkpoint(segmentInfos, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Called internally if any index state has changed. */
|
/** Called internally if any index state has changed. */
|
||||||
synchronized void changed() {
|
private synchronized void changed() {
|
||||||
changeCount.incrementAndGet();
|
changeCount.incrementAndGet();
|
||||||
segmentInfos.changed();
|
segmentInfos.changed();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
|
private synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
|
||||||
assert packet != null && packet.any();
|
assert packet != null && packet.any();
|
||||||
long nextGen = bufferedUpdatesStream.push(packet);
|
long nextGen = bufferedUpdatesStream.push(packet);
|
||||||
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
|
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
|
||||||
@ -2683,7 +2662,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void resetMergeExceptions() {
|
private synchronized void resetMergeExceptions() {
|
||||||
mergeExceptions = new ArrayList<>();
|
mergeExceptions.clear();
|
||||||
mergeGen++;
|
mergeGen++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2971,7 +2950,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
// TODO: somehow we should fix this merge so it's
|
// TODO: somehow we should fix this merge so it's
|
||||||
// abortable so that IW.close(false) is able to stop it
|
// abortable so that IW.close(false) is able to stop it
|
||||||
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
|
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
|
||||||
|
Codec codec = config.getCodec();
|
||||||
// We set the min version to null for now, it will be set later by SegmentMerger
|
// We set the min version to null for now, it will be set later by SegmentMerger
|
||||||
SegmentInfo info = new SegmentInfo(directoryOrig, Version.LATEST, null, mergedName, -1,
|
SegmentInfo info = new SegmentInfo(directoryOrig, Version.LATEST, null, mergedName, -1,
|
||||||
false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), config.getIndexSort());
|
false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), config.getIndexSort());
|
||||||
@ -3267,7 +3246,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
infoStream.message("IW", "hit exception during prepareCommit");
|
infoStream.message("IW", "hit exception during prepareCommit");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert holdsFullFlushLock();
|
assert Thread.holdsLock(fullFlushLock);
|
||||||
// Done: finish the full flush!
|
// Done: finish the full flush!
|
||||||
docWriter.finishFullFlush(flushSuccess);
|
docWriter.finishFullFlush(flushSuccess);
|
||||||
doAfterFlush();
|
doAfterFlush();
|
||||||
@ -3311,7 +3290,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* Ensures that all changes in the reader-pool are written to disk.
|
* Ensures that all changes in the reader-pool are written to disk.
|
||||||
* @param writeDeletes if <code>true</code> if deletes should be written to disk too.
|
* @param writeDeletes if <code>true</code> if deletes should be written to disk too.
|
||||||
*/
|
*/
|
||||||
private final void writeReaderPool(boolean writeDeletes) throws IOException {
|
private void writeReaderPool(boolean writeDeletes) throws IOException {
|
||||||
assert Thread.holdsLock(this);
|
assert Thread.holdsLock(this);
|
||||||
if (writeDeletes) {
|
if (writeDeletes) {
|
||||||
if (readerPool.commit(segmentInfos)) {
|
if (readerPool.commit(segmentInfos)) {
|
||||||
@ -3430,10 +3409,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* merged finished, this method may return true right
|
* merged finished, this method may return true right
|
||||||
* after you had just called {@link #commit}. */
|
* after you had just called {@link #commit}. */
|
||||||
public final boolean hasUncommittedChanges() {
|
public final boolean hasUncommittedChanges() {
|
||||||
return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
|
return changeCount.get() != lastCommitChangeCount || hasChangesInRam();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long commitInternal(MergePolicy mergePolicy) throws IOException {
|
/**
|
||||||
|
* Returns true if there are any changes or deletes that are not flushed or applied.
|
||||||
|
*/
|
||||||
|
boolean hasChangesInRam() {
|
||||||
|
return docWriter.anyChanges() || bufferedUpdatesStream.any();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long commitInternal(MergePolicy mergePolicy) throws IOException {
|
||||||
|
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
infoStream.message("IW", "commit: start");
|
infoStream.message("IW", "commit: start");
|
||||||
@ -3544,11 +3530,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
// at a time:
|
// at a time:
|
||||||
private final Object fullFlushLock = new Object();
|
private final Object fullFlushLock = new Object();
|
||||||
|
|
||||||
// for assert
|
|
||||||
boolean holdsFullFlushLock() {
|
|
||||||
return Thread.holdsLock(fullFlushLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Moves all in-memory segments to the {@link Directory}, but does not commit
|
/** Moves all in-memory segments to the {@link Directory}, but does not commit
|
||||||
* (fsync) them (call {@link #commit} for that). */
|
* (fsync) them (call {@link #commit} for that). */
|
||||||
public final void flush() throws IOException {
|
public final void flush() throws IOException {
|
||||||
@ -3612,7 +3593,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
publishFlushedSegments(true);
|
publishFlushedSegments(true);
|
||||||
flushSuccess = true;
|
flushSuccess = true;
|
||||||
} finally {
|
} finally {
|
||||||
assert holdsFullFlushLock();
|
assert Thread.holdsLock(fullFlushLock);;
|
||||||
docWriter.finishFullFlush(flushSuccess);
|
docWriter.finishFullFlush(flushSuccess);
|
||||||
processEvents(false);
|
processEvents(false);
|
||||||
}
|
}
|
||||||
@ -3643,7 +3624,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final void applyAllDeletesAndUpdates() throws IOException {
|
private void applyAllDeletesAndUpdates() throws IOException {
|
||||||
assert Thread.holdsLock(this) == false;
|
assert Thread.holdsLock(this) == false;
|
||||||
flushDeletesCount.incrementAndGet();
|
flushDeletesCount.incrementAndGet();
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
@ -3672,18 +3653,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void skipDeletedDoc(DocValuesFieldUpdates.Iterator[] updatesIters, int deletedDoc) {
|
|
||||||
for (DocValuesFieldUpdates.Iterator iter : updatesIters) {
|
|
||||||
if (iter.docID() == deletedDoc) {
|
|
||||||
iter.nextDoc();
|
|
||||||
}
|
|
||||||
// when entering the method, all iterators must already be beyond the
|
|
||||||
// deleted document, or right on it, in which case we advance them over
|
|
||||||
// and they must be beyond it now.
|
|
||||||
assert iter.docID() > deletedDoc : "updateDoc=" + iter.docID() + " deletedDoc=" + deletedDoc;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Carefully merges deletes and updates for the segments we just merged. This
|
* Carefully merges deletes and updates for the segments we just merged. This
|
||||||
* is tricky because, although merging will clear all deletes (compacts the
|
* is tricky because, although merging will clear all deletes (compacts the
|
||||||
@ -3694,7 +3663,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* and DV generations for merge.info). If no deletes were flushed, no new
|
* and DV generations for merge.info). If no deletes were flushed, no new
|
||||||
* deletes file is saved.
|
* deletes file is saved.
|
||||||
*/
|
*/
|
||||||
synchronized private ReadersAndUpdates commitMergedDeletesAndUpdates(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
|
private synchronized ReadersAndUpdates commitMergedDeletesAndUpdates(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
|
||||||
|
|
||||||
mergeFinishedGen.incrementAndGet();
|
mergeFinishedGen.incrementAndGet();
|
||||||
|
|
||||||
@ -3882,7 +3851,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("try")
|
@SuppressWarnings("try")
|
||||||
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
|
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
|
||||||
|
|
||||||
testPoint("startCommitMerge");
|
testPoint("startCommitMerge");
|
||||||
|
|
||||||
@ -4014,7 +3983,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
|
private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
|
||||||
|
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
infoStream.message("IW", "handleMergeException: merge=" + segString(merge.segments) + " exc=" + t);
|
infoStream.message("IW", "handleMergeException: merge=" + segString(merge.segments) + " exc=" + t);
|
||||||
@ -4102,8 +4071,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Hook that's called when the specified merge is complete. */
|
/** Hook that's called when the specified merge is complete. */
|
||||||
void mergeSuccess(MergePolicy.OneMerge merge) {
|
protected void mergeSuccess(MergePolicy.OneMerge merge) {}
|
||||||
}
|
|
||||||
|
|
||||||
/** Checks whether this merge involves any segments
|
/** Checks whether this merge involves any segments
|
||||||
* already participating in a merge. If not, this merge
|
* already participating in a merge. If not, this merge
|
||||||
@ -4111,7 +4079,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* are now participating in a merge, and true is
|
* are now participating in a merge, and true is
|
||||||
* returned. Else (the merge conflicts) false is
|
* returned. Else (the merge conflicts) false is
|
||||||
* returned. */
|
* returned. */
|
||||||
final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws IOException {
|
private synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws IOException {
|
||||||
|
|
||||||
if (merge.registerDone) {
|
if (merge.registerDone) {
|
||||||
return true;
|
return true;
|
||||||
@ -4218,7 +4186,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized private void _mergeInit(MergePolicy.OneMerge merge) throws IOException {
|
private synchronized void _mergeInit(MergePolicy.OneMerge merge) throws IOException {
|
||||||
|
|
||||||
testPoint("startMergeInit");
|
testPoint("startMergeInit");
|
||||||
|
|
||||||
@ -4261,7 +4229,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
// names.
|
// names.
|
||||||
final String mergeSegmentName = newSegmentName();
|
final String mergeSegmentName = newSegmentName();
|
||||||
// We set the min version to null for now, it will be set later by SegmentMerger
|
// We set the min version to null for now, it will be set later by SegmentMerger
|
||||||
SegmentInfo si = new SegmentInfo(directoryOrig, Version.LATEST, null, mergeSegmentName, -1, false, codec,
|
SegmentInfo si = new SegmentInfo(directoryOrig, Version.LATEST, null, mergeSegmentName, -1, false, config.getCodec(),
|
||||||
Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), config.getIndexSort());
|
Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), config.getIndexSort());
|
||||||
Map<String,String> details = new HashMap<>();
|
Map<String,String> details = new HashMap<>();
|
||||||
details.put("mergeMaxNumSegments", "" + merge.maxNumSegments);
|
details.put("mergeMaxNumSegments", "" + merge.maxNumSegments);
|
||||||
@ -4477,7 +4445,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
MergeState mergeState = merger.mergeState;
|
MergeState mergeState = merger.mergeState;
|
||||||
assert mergeState.segmentInfo == merge.info.info;
|
assert mergeState.segmentInfo == merge.info.info;
|
||||||
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
|
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
|
||||||
|
Codec codec = config.getCodec();
|
||||||
if (infoStream.isEnabled("IW")) {
|
if (infoStream.isEnabled("IW")) {
|
||||||
if (merger.shouldMerge()) {
|
if (merger.shouldMerge()) {
|
||||||
String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
|
String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
|
||||||
@ -4646,7 +4614,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
return merge.info.info.maxDoc();
|
return merge.info.info.maxDoc();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void addMergeException(MergePolicy.OneMerge merge) {
|
private synchronized void addMergeException(MergePolicy.OneMerge merge) {
|
||||||
assert merge.getException() != null;
|
assert merge.getException() != null;
|
||||||
if (!mergeExceptions.contains(merge) && mergeGen == merge.mergeGen) {
|
if (!mergeExceptions.contains(merge) && mergeGen == merge.mergeGen) {
|
||||||
mergeExceptions.add(merge);
|
mergeExceptions.add(merge);
|
||||||
@ -4889,7 +4857,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* hits an unrecoverable exception. This method does not rethrow the tragic event exception.
|
* hits an unrecoverable exception. This method does not rethrow the tragic event exception.
|
||||||
* Note: This method will not close the writer but can be called from any location without respecting any lock order
|
* Note: This method will not close the writer but can be called from any location without respecting any lock order
|
||||||
*/
|
*/
|
||||||
final void onTragicEvent(Throwable tragedy, String location) {
|
private void onTragicEvent(Throwable tragedy, String location) {
|
||||||
// This is not supposed to be tragic: IW is supposed to catch this and
|
// This is not supposed to be tragic: IW is supposed to catch this and
|
||||||
// ignore, because it means we asked the merge to abort:
|
// ignore, because it means we asked the merge to abort:
|
||||||
assert tragedy instanceof MergePolicy.MergeAbortedException == false;
|
assert tragedy instanceof MergePolicy.MergeAbortedException == false;
|
||||||
@ -4946,7 +4914,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
// startCommitMergeDeletes
|
// startCommitMergeDeletes
|
||||||
// startMergeInit
|
// startMergeInit
|
||||||
// DocumentsWriterPerThread addDocuments start
|
// DocumentsWriterPerThread addDocuments start
|
||||||
private final void testPoint(String message) {
|
private void testPoint(String message) {
|
||||||
if (enableTestPoints) {
|
if (enableTestPoints) {
|
||||||
assert infoStream.isEnabled("TP"); // don't enable unless you need them.
|
assert infoStream.isEnabled("TP"); // don't enable unless you need them.
|
||||||
infoStream.message("TP", message);
|
infoStream.message("TP", message);
|
||||||
@ -4971,6 +4939,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
return closed;
|
return closed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isDeleterClosed() {
|
||||||
|
return deleter.isClosed();
|
||||||
|
}
|
||||||
|
|
||||||
/** Expert: remove any index files that are no longer
|
/** Expert: remove any index files that are no longer
|
||||||
* used.
|
* used.
|
||||||
*
|
*
|
||||||
@ -5009,7 +4981,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* deletion files, this SegmentInfo must not reference such files when this
|
* deletion files, this SegmentInfo must not reference such files when this
|
||||||
* method is called, because they are not allowed within a compound file.
|
* method is called, because they are not allowed within a compound file.
|
||||||
*/
|
*/
|
||||||
static final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context, IOUtils.IOConsumer<Collection<String>> deleteFiles) throws IOException {
|
static void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context, IOUtils.IOConsumer<Collection<String>> deleteFiles) throws IOException {
|
||||||
|
|
||||||
// maybe this check is not needed, but why take the risk?
|
// maybe this check is not needed, but why take the risk?
|
||||||
if (!directory.getCreatedFiles().isEmpty()) {
|
if (!directory.getCreatedFiles().isEmpty()) {
|
||||||
@ -5047,7 +5019,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
/**
|
/**
|
||||||
* Cleans up residuals from a segment that could not be entirely flushed due to an error
|
* Cleans up residuals from a segment that could not be entirely flushed due to an error
|
||||||
*/
|
*/
|
||||||
private synchronized final void flushFailed(SegmentInfo info) throws IOException {
|
private synchronized void flushFailed(SegmentInfo info) throws IOException {
|
||||||
// TODO: this really should be a tragic
|
// TODO: this really should be a tragic
|
||||||
Collection<String> files;
|
Collection<String> files;
|
||||||
try {
|
try {
|
||||||
@ -5070,7 +5042,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* if <code>false</code> the call will try to acquire the queue lock and exits if it's held by another thread.
|
* if <code>false</code> the call will try to acquire the queue lock and exits if it's held by another thread.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
void publishFlushedSegments(boolean forced) throws IOException {
|
private void publishFlushedSegments(boolean forced) throws IOException {
|
||||||
docWriter.purgeFlushTickets(forced, ticket -> {
|
docWriter.purgeFlushTickets(forced, ticket -> {
|
||||||
DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
|
DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
|
||||||
FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
|
FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
|
||||||
@ -5265,7 +5237,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* {@link IndexWriter#forceApply(FrozenBufferedUpdates)} must be called.
|
* {@link IndexWriter#forceApply(FrozenBufferedUpdates)} must be called.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("try")
|
@SuppressWarnings("try")
|
||||||
boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
|
final boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
|
||||||
if (updates.tryLock()) {
|
if (updates.tryLock()) {
|
||||||
try {
|
try {
|
||||||
forceApply(updates);
|
forceApply(updates);
|
||||||
@ -5282,7 +5254,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
|
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
|
||||||
* operation and is done concurrently by incoming indexing threads.
|
* operation and is done concurrently by incoming indexing threads.
|
||||||
*/
|
*/
|
||||||
void forceApply(FrozenBufferedUpdates updates) throws IOException {
|
final void forceApply(FrozenBufferedUpdates updates) throws IOException {
|
||||||
updates.lock();
|
updates.lock();
|
||||||
try {
|
try {
|
||||||
if (updates.isApplied()) {
|
if (updates.isApplied()) {
|
||||||
@ -5445,7 +5417,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
infos = null;
|
infos = null;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
infos = listOfSegmentCommitInfos();
|
infos = segmentInfos.asList();
|
||||||
}
|
}
|
||||||
return infos;
|
return infos;
|
||||||
}
|
}
|
||||||
@ -5544,11 +5516,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns an unmodifiable view of the list of all segments of the current segmentInfos */
|
|
||||||
final synchronized List<SegmentCommitInfo> listOfSegmentCommitInfos() {
|
|
||||||
return segmentInfos.asList();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Tests should use this method to snapshot the current segmentInfos to have a consistent view */
|
/** Tests should use this method to snapshot the current segmentInfos to have a consistent view */
|
||||||
final synchronized SegmentInfos cloneSegmentInfos() {
|
final synchronized SegmentInfos cloneSegmentInfos() {
|
||||||
return segmentInfos.clone();
|
return segmentInfos.clone();
|
||||||
|
@ -1446,7 +1446,7 @@ public class TestAddIndexes extends LuceneTestCase {
|
|||||||
assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs);
|
assertEquals(wrappedReader.numDocs(), writer.getDocStats().numDocs);
|
||||||
assertEquals(maxDoc, writer.getDocStats().maxDoc);
|
assertEquals(maxDoc, writer.getDocStats().maxDoc);
|
||||||
writer.commit();
|
writer.commit();
|
||||||
SegmentCommitInfo commitInfo = writer.listOfSegmentCommitInfos().get(0);
|
SegmentCommitInfo commitInfo = writer.cloneSegmentInfos().info(0);
|
||||||
assertEquals(maxDoc-wrappedReader.numDocs(), commitInfo.getSoftDelCount());
|
assertEquals(maxDoc-wrappedReader.numDocs(), commitInfo.getSoftDelCount());
|
||||||
writer.close();
|
writer.close();
|
||||||
Directory dir3 = newDirectory();
|
Directory dir3 = newDirectory();
|
||||||
|
@ -115,7 +115,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
|
|||||||
failure.clearDoFail();
|
failure.clearDoFail();
|
||||||
assertTrue(writer.isClosed());
|
assertTrue(writer.isClosed());
|
||||||
// Abort should have closed the deleter:
|
// Abort should have closed the deleter:
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
break outer;
|
break outer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||||||
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
|
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
|
||||||
assertActiveBytesAfter(flushControl);
|
assertActiveBytesAfter(flushControl);
|
||||||
if (flushPolicy.hasMarkedPending) {
|
if (flushPolicy.hasMarkedPending) {
|
||||||
assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
|
assertTrue(maxRAMBytes < flushControl.getPeakActiveBytes());
|
||||||
}
|
}
|
||||||
if (ensureNotStalled) {
|
if (ensureNotStalled) {
|
||||||
assertFalse(docsWriter.flushControl.stallControl.wasStalled());
|
assertFalse(docsWriter.flushControl.stallControl.wasStalled());
|
||||||
@ -194,8 +194,8 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||||||
assertTrue("peak bytes without flush exceeded watermark",
|
assertTrue("peak bytes without flush exceeded watermark",
|
||||||
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
|
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
|
||||||
if (flushPolicy.hasMarkedPending) {
|
if (flushPolicy.hasMarkedPending) {
|
||||||
assertTrue("max: " + maxRAMBytes + " " + flushControl.peakActiveBytes,
|
assertTrue("max: " + maxRAMBytes + " " + flushControl.getPeakActiveBytes(),
|
||||||
maxRAMBytes <= flushControl.peakActiveBytes);
|
maxRAMBytes <= flushControl.getPeakActiveBytes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertActiveBytesAfter(flushControl);
|
assertActiveBytesAfter(flushControl);
|
||||||
@ -253,7 +253,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||||||
"single thread must not block numThreads: " + numThreads[i],
|
"single thread must not block numThreads: " + numThreads[i],
|
||||||
docsWriter.flushControl.stallControl.hasBlocked());
|
docsWriter.flushControl.stallControl.hasBlocked());
|
||||||
}
|
}
|
||||||
if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
|
if (docsWriter.flushControl.getPeakNetBytes() > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
|
||||||
assertTrue(docsWriter.flushControl.stallControl.wasStalled());
|
assertTrue(docsWriter.flushControl.stallControl.wasStalled());
|
||||||
}
|
}
|
||||||
assertActiveBytesAfter(flushControl);
|
assertActiveBytesAfter(flushControl);
|
||||||
|
@ -70,7 +70,7 @@ public class TestForceMergeForever extends LuceneTestCase {
|
|||||||
w.addDocument(docs.nextDoc());
|
w.addDocument(docs.nextDoc());
|
||||||
}
|
}
|
||||||
MergePolicy mp = w.getConfig().getMergePolicy();
|
MergePolicy mp = w.getConfig().getMergePolicy();
|
||||||
final int mergeAtOnce = 1+w.listOfSegmentCommitInfos().size();
|
final int mergeAtOnce = 1+w.cloneSegmentInfos().size();
|
||||||
if (mp instanceof TieredMergePolicy) {
|
if (mp instanceof TieredMergePolicy) {
|
||||||
((TieredMergePolicy) mp).setMaxMergeAtOnce(mergeAtOnce);
|
((TieredMergePolicy) mp).setMaxMergeAtOnce(mergeAtOnce);
|
||||||
} else if (mp instanceof LogMergePolicy) {
|
} else if (mp instanceof LogMergePolicy) {
|
||||||
|
@ -3448,12 +3448,12 @@ public class TestIndexWriter extends LuceneTestCase {
|
|||||||
d.add(new StringField("id", "doc-1", Field.Store.YES));
|
d.add(new StringField("id", "doc-1", Field.Store.YES));
|
||||||
writer.addDocument(d);
|
writer.addDocument(d);
|
||||||
writer.deleteDocuments(new Term("id", "doc-1"));
|
writer.deleteDocuments(new Term("id", "doc-1"));
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
writer.flush();
|
writer.flush();
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
writer.commit();
|
writer.commit();
|
||||||
assertFiles(writer);
|
assertFiles(writer);
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
IOUtils.close(writer, dir);
|
IOUtils.close(writer, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,7 +923,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertTrue(modifier.deleter.isClosed());
|
assertTrue(modifier.isDeleterClosed());
|
||||||
|
|
||||||
TestIndexWriter.assertNoUnreferencedFiles(dir, "docsWriter.abort() failed to delete unreferenced files");
|
TestIndexWriter.assertNoUnreferencedFiles(dir, "docsWriter.abort() failed to delete unreferenced files");
|
||||||
dir.close();
|
dir.close();
|
||||||
|
@ -597,7 +597,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||||||
// only one flush should fail:
|
// only one flush should fail:
|
||||||
assertFalse(hitError);
|
assertFalse(hitError);
|
||||||
hitError = true;
|
hitError = true;
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
assertTrue(writer.isClosed());
|
assertTrue(writer.isClosed());
|
||||||
assertFalse(DirectoryReader.indexExists(dir));
|
assertFalse(DirectoryReader.indexExists(dir));
|
||||||
|
|
||||||
@ -1292,7 +1292,7 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
assertTrue(e.getMessage().startsWith(FailOnTermVectors.EXC_MSG));
|
assertTrue(e.getMessage().startsWith(FailOnTermVectors.EXC_MSG));
|
||||||
// This is an aborting exception, so writer is closed:
|
// This is an aborting exception, so writer is closed:
|
||||||
assertTrue(w.deleter.isClosed());
|
assertTrue(w.isDeleterClosed());
|
||||||
assertTrue(w.isClosed());
|
assertTrue(w.isClosed());
|
||||||
dir.close();
|
dir.close();
|
||||||
continue iters;
|
continue iters;
|
||||||
|
@ -143,7 +143,7 @@ public class TestIndexWriterExceptions2 extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: writer was closed by abort; we just reopen now:
|
// OK: writer was closed by abort; we just reopen now:
|
||||||
assertTrue(iw.deleter.isClosed());
|
assertTrue(iw.isDeleterClosed());
|
||||||
assertTrue(allowAlreadyClosed);
|
assertTrue(allowAlreadyClosed);
|
||||||
allowAlreadyClosed = false;
|
allowAlreadyClosed = false;
|
||||||
conf = newIndexWriterConfig(analyzer);
|
conf = newIndexWriterConfig(analyzer);
|
||||||
@ -177,7 +177,7 @@ public class TestIndexWriterExceptions2 extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: writer was closed by abort; we just reopen now:
|
// OK: writer was closed by abort; we just reopen now:
|
||||||
assertTrue(iw.deleter.isClosed());
|
assertTrue(iw.isDeleterClosed());
|
||||||
assertTrue(allowAlreadyClosed);
|
assertTrue(allowAlreadyClosed);
|
||||||
allowAlreadyClosed = false;
|
allowAlreadyClosed = false;
|
||||||
conf = newIndexWriterConfig(analyzer);
|
conf = newIndexWriterConfig(analyzer);
|
||||||
@ -215,7 +215,7 @@ public class TestIndexWriterExceptions2 extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: writer was closed by abort; we just reopen now:
|
// OK: writer was closed by abort; we just reopen now:
|
||||||
assertTrue(iw.deleter.isClosed());
|
assertTrue(iw.isDeleterClosed());
|
||||||
assertTrue(allowAlreadyClosed);
|
assertTrue(allowAlreadyClosed);
|
||||||
allowAlreadyClosed = false;
|
allowAlreadyClosed = false;
|
||||||
conf = newIndexWriterConfig(analyzer);
|
conf = newIndexWriterConfig(analyzer);
|
||||||
|
@ -458,7 +458,7 @@ public class TestIndexWriterMaxDocs extends LuceneTestCase {
|
|||||||
dir2.close();
|
dir2.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTooLargeMaxDocs() throws Exception {
|
public void testTooLargeMaxDocs() {
|
||||||
expectThrows(IllegalArgumentException.class, () -> {
|
expectThrows(IllegalArgumentException.class, () -> {
|
||||||
IndexWriter.setMaxDocs(Integer.MAX_VALUE);
|
IndexWriter.setMaxDocs(Integer.MAX_VALUE);
|
||||||
});
|
});
|
||||||
|
@ -550,7 +550,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
|
|||||||
expectThrows(IOException.class, () -> {
|
expectThrows(IOException.class, () -> {
|
||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
});
|
});
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
assertTrue(writer.isClosed());
|
assertTrue(writer.isClosed());
|
||||||
|
|
||||||
dir.close();
|
dir.close();
|
||||||
|
@ -180,7 +180,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
|
|||||||
writer.commit();
|
writer.commit();
|
||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: abort closes the writer
|
// OK: abort closes the writer
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
} finally {
|
} finally {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
@ -317,7 +317,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
|
|||||||
success = true;
|
success = true;
|
||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: abort closes the writer
|
// OK: abort closes the writer
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
writer.rollback();
|
writer.rollback();
|
||||||
failure.clearDoFail();
|
failure.clearDoFail();
|
||||||
@ -388,7 +388,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
|
|||||||
writer.close();
|
writer.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
assertTrue(writer.deleter.isClosed());
|
assertTrue(writer.isDeleterClosed());
|
||||||
dir.close();
|
dir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,14 +49,14 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
//System.out.println("commit1");
|
//System.out.println("commit1");
|
||||||
writer.commit();
|
writer.commit();
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
for (int x = 5; x < 10; x++) {
|
for (int x = 5; x < 10; x++) {
|
||||||
writer.addDocument(DocHelper.createDocument(x, "2", 2));
|
writer.addDocument(DocHelper.createDocument(x, "2", 2));
|
||||||
//System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
|
//System.out.println("numRamDocs(" + x + ")" + writer.numRamDocs());
|
||||||
}
|
}
|
||||||
//System.out.println("commit2");
|
//System.out.println("commit2");
|
||||||
writer.commit();
|
writer.commit();
|
||||||
assertEquals(2, writer.listOfSegmentCommitInfos().size());
|
assertEquals(2, writer.cloneSegmentInfos().size());
|
||||||
|
|
||||||
for (int x = 10; x < 15; x++) {
|
for (int x = 10; x < 15; x++) {
|
||||||
writer.addDocument(DocHelper.createDocument(x, "3", 2));
|
writer.addDocument(DocHelper.createDocument(x, "3", 2));
|
||||||
@ -71,12 +71,12 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
|
|||||||
|
|
||||||
// deletes are now resolved on flush, so there shouldn't be
|
// deletes are now resolved on flush, so there shouldn't be
|
||||||
// any deletes after flush
|
// any deletes after flush
|
||||||
assertFalse(writer.bufferedUpdatesStream.any());
|
assertFalse(writer.hasChangesInRam());
|
||||||
|
|
||||||
// get reader flushes pending deletes
|
// get reader flushes pending deletes
|
||||||
// so there should not be anymore
|
// so there should not be anymore
|
||||||
IndexReader r1 = writer.getReader();
|
IndexReader r1 = writer.getReader();
|
||||||
assertFalse(writer.bufferedUpdatesStream.any());
|
assertFalse(writer.hasChangesInRam());
|
||||||
r1.close();
|
r1.close();
|
||||||
|
|
||||||
// delete id:2 from the first segment
|
// delete id:2 from the first segment
|
||||||
@ -90,7 +90,7 @@ public class TestPerSegmentDeletes extends LuceneTestCase {
|
|||||||
fsmp.length = 2;
|
fsmp.length = 2;
|
||||||
writer.maybeMerge();
|
writer.maybeMerge();
|
||||||
|
|
||||||
assertEquals(2, writer.listOfSegmentCommitInfos().size());
|
assertEquals(2, writer.cloneSegmentInfos().size());
|
||||||
|
|
||||||
// id:2 shouldn't exist anymore because
|
// id:2 shouldn't exist anymore because
|
||||||
// it's been applied in the merge and now it's gone
|
// it's been applied in the merge and now it's gone
|
||||||
|
@ -394,7 +394,7 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||||||
// We expect any MP to merge these segments into one segment
|
// We expect any MP to merge these segments into one segment
|
||||||
// when calling forceMergeDeletes.
|
// when calling forceMergeDeletes.
|
||||||
writer.forceMergeDeletes(true);
|
writer.forceMergeDeletes(true);
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
assertEquals(1, writer.getDocStats().numDocs);
|
assertEquals(1, writer.getDocStats().numDocs);
|
||||||
assertEquals(1, writer.getDocStats().maxDoc);
|
assertEquals(1, writer.getDocStats().maxDoc);
|
||||||
writer.close();
|
writer.close();
|
||||||
@ -417,7 +417,7 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||||||
writer.addDocument(d);
|
writer.addDocument(d);
|
||||||
}
|
}
|
||||||
writer.flush();
|
writer.flush();
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
|
|
||||||
if (softDelete != null) {
|
if (softDelete != null) {
|
||||||
// the newly created segment should be dropped as it is fully deleted (i.e. only contains deleted docs).
|
// the newly created segment should be dropped as it is fully deleted (i.e. only contains deleted docs).
|
||||||
@ -445,7 +445,7 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||||||
IndexReader reader = writer.getReader();
|
IndexReader reader = writer.getReader();
|
||||||
assertEquals(reader.numDocs(), 1);
|
assertEquals(reader.numDocs(), 1);
|
||||||
reader.close();
|
reader.close();
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
|
|
||||||
writer.close();
|
writer.close();
|
||||||
dir.close();
|
dir.close();
|
||||||
@ -604,8 +604,8 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
writer.forceMergeDeletes(true);
|
writer.forceMergeDeletes(true);
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
SegmentCommitInfo si = writer.listOfSegmentCommitInfos().get(0);
|
SegmentCommitInfo si = writer.cloneSegmentInfos().info(0);
|
||||||
assertEquals(0, si.getSoftDelCount()); // hard-delete should supersede the soft-delete
|
assertEquals(0, si.getSoftDelCount()); // hard-delete should supersede the soft-delete
|
||||||
assertEquals(0, si.getDelCount());
|
assertEquals(0, si.getDelCount());
|
||||||
assertEquals(1, si.info.maxDoc());
|
assertEquals(1, si.info.maxDoc());
|
||||||
@ -625,8 +625,8 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
|
|||||||
doUpdate(new Term("id", "0"), writer,
|
doUpdate(new Term("id", "0"), writer,
|
||||||
new NumericDocValuesField("soft_delete", 1), new NumericDocValuesField("other-field", 1));
|
new NumericDocValuesField("soft_delete", 1), new NumericDocValuesField("other-field", 1));
|
||||||
sm.maybeRefreshBlocking();
|
sm.maybeRefreshBlocking();
|
||||||
assertEquals(1, writer.listOfSegmentCommitInfos().size());
|
assertEquals(1, writer.cloneSegmentInfos().size());
|
||||||
SegmentCommitInfo si = writer.listOfSegmentCommitInfos().get(0);
|
SegmentCommitInfo si = writer.cloneSegmentInfos().info(0);
|
||||||
assertEquals(1, si.getSoftDelCount());
|
assertEquals(1, si.getSoftDelCount());
|
||||||
assertEquals(1, si.info.maxDoc());
|
assertEquals(1, si.info.maxDoc());
|
||||||
IOUtils.close(sm, writer, dir);
|
IOUtils.close(sm, writer, dir);
|
||||||
|
@ -363,9 +363,9 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
|
|||||||
|
|
||||||
w.commit(); // want to trigger merge no matter what.
|
w.commit(); // want to trigger merge no matter what.
|
||||||
|
|
||||||
assertEquals("There should be exactly one very large and one small segment", 2, w.listOfSegmentCommitInfos().size());
|
assertEquals("There should be exactly one very large and one small segment", 2, w.cloneSegmentInfos().size());
|
||||||
SegmentCommitInfo info0 = w.listOfSegmentCommitInfos().get(0);
|
SegmentCommitInfo info0 = w.cloneSegmentInfos().info(0);
|
||||||
SegmentCommitInfo info1 = w.listOfSegmentCommitInfos().get(1);
|
SegmentCommitInfo info1 = w.cloneSegmentInfos().info(1);
|
||||||
int largeSegDocCount = Math.max(info0.info.maxDoc(), info1.info.maxDoc());
|
int largeSegDocCount = Math.max(info0.info.maxDoc(), info1.info.maxDoc());
|
||||||
int smallSegDocCount = Math.min(info0.info.maxDoc(), info1.info.maxDoc());
|
int smallSegDocCount = Math.min(info0.info.maxDoc(), info1.info.maxDoc());
|
||||||
assertEquals("The large segment should have a bunch of docs", largeSegDocCount, remainingDocs);
|
assertEquals("The large segment should have a bunch of docs", largeSegDocCount, remainingDocs);
|
||||||
|
@ -138,7 +138,7 @@ public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
|
|||||||
|
|
||||||
final IndexWriter w = new IndexWriter(dir, iwc) {
|
final IndexWriter w = new IndexWriter(dir, iwc) {
|
||||||
@Override
|
@Override
|
||||||
void mergeSuccess(MergePolicy.OneMerge merge) {
|
protected void mergeSuccess(MergePolicy.OneMerge merge) {
|
||||||
// tragedy strikes!
|
// tragedy strikes!
|
||||||
throw new OutOfMemoryError();
|
throw new OutOfMemoryError();
|
||||||
}
|
}
|
||||||
|
@ -623,7 +623,7 @@ abstract class BaseIndexFileFormatTestCase extends LuceneTestCase {
|
|||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: writer was closed by abort; we just reopen now:
|
// OK: writer was closed by abort; we just reopen now:
|
||||||
dir.setRandomIOExceptionRateOnOpen(0.0); // disable exceptions on openInput until next iteration
|
dir.setRandomIOExceptionRateOnOpen(0.0); // disable exceptions on openInput until next iteration
|
||||||
assertTrue(iw.deleter.isClosed());
|
assertTrue(iw.isDeleterClosed());
|
||||||
assertTrue(allowAlreadyClosed);
|
assertTrue(allowAlreadyClosed);
|
||||||
allowAlreadyClosed = false;
|
allowAlreadyClosed = false;
|
||||||
conf = newIndexWriterConfig(analyzer);
|
conf = newIndexWriterConfig(analyzer);
|
||||||
@ -659,7 +659,7 @@ abstract class BaseIndexFileFormatTestCase extends LuceneTestCase {
|
|||||||
} catch (AlreadyClosedException ace) {
|
} catch (AlreadyClosedException ace) {
|
||||||
// OK: writer was closed by abort; we just reopen now:
|
// OK: writer was closed by abort; we just reopen now:
|
||||||
dir.setRandomIOExceptionRateOnOpen(0.0); // disable exceptions on openInput until next iteration
|
dir.setRandomIOExceptionRateOnOpen(0.0); // disable exceptions on openInput until next iteration
|
||||||
assertTrue(iw.deleter.isClosed());
|
assertTrue(iw.isDeleterClosed());
|
||||||
assertTrue(allowAlreadyClosed);
|
assertTrue(allowAlreadyClosed);
|
||||||
allowAlreadyClosed = false;
|
allowAlreadyClosed = false;
|
||||||
conf = newIndexWriterConfig(analyzer);
|
conf = newIndexWriterConfig(analyzer);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user