LUCENE-10118: Improve CMS infostream messages (#337)

Expand the log message when CMS.MergeThread completes its merge operation, 
to include addition useful diagnostic information, like the total-bytes-written, 
the time taken, as well as rate limiter information. Also, while here, unify the 
thread start and end log output to help improve tracing.
This commit is contained in:
Chris Hegarty 2021-09-30 11:43:45 +01:00 committed by GitHub
parent ca810e732d
commit 797cfbf477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 3 deletions

View File

@ -691,15 +691,30 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
public void run() {
try {
if (verbose()) {
message(" merge thread: start");
message(String.format(Locale.ROOT, "merge thread %s start", this.getName()));
}
doMerge(mergeSource, merge);
if (verbose()) {
message(
String.format(
Locale.ROOT,
"merge thread %s merge segment [%s] done estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s",
this.getName(),
getSegmentName(merge),
bytesToMB(merge.estimatedMergeBytes),
bytesToMB(rateLimiter.getTotalBytesWritten()),
nsToSec(System.nanoTime() - merge.mergeStartNS),
nsToSec(rateLimiter.getTotalStoppedNS()),
nsToSec(rateLimiter.getTotalPausedNS()),
rateToString(rateLimiter.getMBPerSec())));
}
runOnMergeFinished(mergeSource);
if (verbose()) {
message(" merge thread: done");
message(String.format(Locale.ROOT, "merge thread %s end", this.getName()));
}
runOnMergeFinished(mergeSource);
} catch (Throwable exc) {
if (exc instanceof MergePolicy.MergeAbortedException) {
// OK to ignore
@ -878,4 +893,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private static double bytesToMB(long bytes) {
return bytes / 1024. / 1024.;
}
private static String getSegmentName(MergePolicy.OneMerge merge) {
return merge.info != null ? merge.info.info.name : "_na_";
}
}

View File

@ -21,10 +21,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.CannedTokenStream;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@ -617,6 +621,76 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
dir.close();
}
// LUCENE-10118 : Verify the basic log output from MergeThreads
public void testMergeThreadMessages() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
Set<Thread> mergeThreadSet = ConcurrentHashMap.newKeySet();
ConcurrentMergeScheduler cms =
new ConcurrentMergeScheduler() {
@Override
protected synchronized MergeThread getMergeThread(
MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
MergeThread newMergeThread = super.getMergeThread(mergeSource, merge);
mergeThreadSet.add(newMergeThread);
return newMergeThread;
}
};
iwc.setMergeScheduler(cms);
List<String> messages = new ArrayList<>();
iwc.setInfoStream(
new InfoStream() {
@Override
public void close() {}
@Override
public void message(String component, String message) {
if (component.equals("MS")) messages.add(message);
}
@Override
public boolean isEnabled(String component) {
if (component.equals("MS")) return true;
return false;
}
});
iwc.setMaxBufferedDocs(2);
LogMergePolicy lmp = newLogMergePolicy();
lmp.setMergeFactor(2);
iwc.setMergePolicy(lmp);
IndexWriter w = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new TextField("foo", new CannedTokenStream()));
w.addDocument(doc);
w.addDocument(new Document());
// flush
w.addDocument(new Document());
w.addDocument(new Document());
// flush + merge
w.close();
dir.close();
assertTrue(mergeThreadSet.size() > 0);
for (Thread t : mergeThreadSet) {
t.join();
}
for (Thread t : mergeThreadSet) {
String name = t.getName();
List<String> threadMsgs =
messages.stream()
.filter(line -> line.startsWith("merge thread " + name))
.collect(Collectors.toList());
assertTrue(threadMsgs.size() >= 3);
assertTrue(threadMsgs.get(0).startsWith("merge thread " + name + " start"));
assertTrue(
threadMsgs.stream()
.anyMatch(line -> line.startsWith("merge thread " + name + " merge segment")));
assertTrue(threadMsgs.get(threadMsgs.size() - 1).startsWith("merge thread " + name + " end"));
}
}
public void testDynamicDefaults() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));