LUCENE-9406: Add IndexWriterEventListener to track events in IndexWriter (#2342)

This commit is contained in:
zacharymorn 2021-03-02 00:54:08 -08:00 committed by GitHub
parent 8b443420b8
commit 6ba9fe5be3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 160 additions and 0 deletions

View File

@ -379,6 +379,8 @@ public class IndexWriter
private final ReaderPool readerPool;
private final BufferedUpdatesStream bufferedUpdatesStream;
private final IndexWriterEventListener eventListener;
/**
* Counts how many merges have completed; this is used by {@link
* #forceApply(FrozenBufferedUpdates)} to handle concurrently apply deletes/updates with merges
@ -938,6 +940,7 @@ public class IndexWriter
config = conf;
infoStream = config.getInfoStream();
softDeletesEnabled = config.getSoftDeletesField() != null;
eventListener = config.getIndexWriterEventListener();
// obtain the write.lock. If the user configured a timeout,
// we wrap with a sleeper and this might take some time.
writeLock = d.obtainLock(WRITE_LOCK_NAME);
@ -3522,11 +3525,16 @@ public class IndexWriter
infoStream.message(
"IW", "now run merges during commit: " + pointInTimeMerges.segString(directory));
}
eventListener.beginMergeOnFullFlush(pointInTimeMerges);
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "done waiting for merges during commit");
}
eventListener.endMergeOnFullFlush(pointInTimeMerges);
synchronized (this) {
// we need to call this under lock since mergeFinished above is also called under the IW
// lock

View File

@ -520,4 +520,11 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
this.softDeletesField = softDeletesField;
return this;
}
/** Set event listener to record key events in IndexWriter */
public IndexWriterConfig setIndexWriterEventListener(
final IndexWriterEventListener eventListener) {
this.eventListener = eventListener;
return this;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
/**
* A callback event listener for recording key events happened inside IndexWriter
*
* @lucene.experimental
*/
public interface IndexWriterEventListener {
/** A no-op listener that helps to save null checks */
IndexWriterEventListener NO_OP_LISTENER =
new IndexWriterEventListener() {
@Override
public void beginMergeOnFullFlush(MergePolicy.MergeSpecification merge) {}
@Override
public void endMergeOnFullFlush(MergePolicy.MergeSpecification merge) {}
};
/**
* Invoked at the start of merge on commit
*
* @param merge specification to be tracked
*/
void beginMergeOnFullFlush(MergePolicy.MergeSpecification merge);
/**
* Invoked at the end of merge on commit, due to either merge completed, or merge timed out
* according to {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis(long)}
*
* @param merge specification to be tracked
*/
void endMergeOnFullFlush(MergePolicy.MergeSpecification merge);
}

View File

@ -106,6 +106,9 @@ public class LiveIndexWriterConfig {
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
protected volatile long maxFullFlushMergeWaitMillis;
/** The IndexWriter event listener to record key events * */
protected IndexWriterEventListener eventListener;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
@ -128,6 +131,7 @@ public class LiveIndexWriterConfig {
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS;
eventListener = IndexWriterEventListener.NO_OP_LISTENER;
}
/** Returns the default analyzer to use for indexing documents. */
@ -432,6 +436,11 @@ public class LiveIndexWriterConfig {
return maxFullFlushMergeWaitMillis;
}
/** Returns the IndexWriterEventListener callback that tracks the key IndexWriter operations. */
public IndexWriterEventListener getIndexWriterEventListener() {
return eventListener;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -458,6 +467,7 @@ public class LiveIndexWriterConfig {
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
sb.append("maxFullFlushMergeWaitMillis=").append(getMaxFullFlushMergeWaitMillis()).append("\n");
sb.append("eventListener=").append(getIndexWriterEventListener()).append("\n");
return sb.toString();
}
}

View File

@ -388,6 +388,50 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
dir.close();
}
// Test basic semantics of merge on commit and events recording invocation
public void testMergeOnCommitWithEventListener() throws IOException {
Directory dir = newDirectory();
IndexWriter firstWriter =
new IndexWriter(
dir,
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
for (int i = 0; i < 5; i++) {
TestIndexWriter.addDoc(firstWriter);
firstWriter.flush();
}
DirectoryReader firstReader = DirectoryReader.open(firstWriter);
assertEquals(5, firstReader.leaves().size());
firstReader.close();
firstWriter.close(); // When this writer closes, it does not merge on commit.
MockIndexWriterEventListener eventListener = new MockIndexWriterEventListener();
IndexWriterConfig iwc =
newIndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(new MergeOnXMergePolicy(newMergePolicy(), MergeTrigger.COMMIT))
.setMaxFullFlushMergeWaitMillis(Integer.MAX_VALUE)
.setIndexWriterEventListener(eventListener);
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
assertEquals(5, unmergedReader.leaves().size());
unmergedReader.close();
TestIndexWriter.addDoc(writerWithMergePolicy);
assertFalse(eventListener.isEventsRecorded());
writerWithMergePolicy.commit(); // Doc added, do merge on commit.
assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
assertTrue(eventListener.isEventsRecorded());
writerWithMergePolicy.close();
dir.close();
}
private void assertSetters(MergePolicy lmp) {
lmp.setMaxCFSSegmentSizeMB(2.0);
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
/** Mock IndexWriterEventListener to verify invocation of event methods */
public class MockIndexWriterEventListener implements IndexWriterEventListener {
private boolean beginMergeCalled = false;
private boolean endMergeCalled = false;
@Override
public void beginMergeOnFullFlush(MergePolicy.MergeSpecification merge) {
beginMergeCalled = true;
}
@Override
public void endMergeOnFullFlush(MergePolicy.MergeSpecification merge) {
endMergeCalled = true;
}
public boolean isEventsRecorded() {
return beginMergeCalled && endMergeCalled;
}
}

View File

@ -1007,6 +1007,11 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
if (rarely(r)) {
c.setIndexWriterEventListener(new MockIndexWriterEventListener());
}
c.setMaxFullFlushMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
return c;
}