LUCENE-1022: share writer's infoStream with LogMergePolicy & ConcurrentMergeScheduler

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@587101 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-10-22 14:04:31 +00:00
parent 1df7007219
commit 38bbbb51de
3 changed files with 40 additions and 30 deletions

View File

@ -33,8 +33,6 @@ import java.util.ArrayList;
public class ConcurrentMergeScheduler implements MergeScheduler { public class ConcurrentMergeScheduler implements MergeScheduler {
public static boolean VERBOSE = false;
private int mergeThreadPriority = -1; private int mergeThreadPriority = -1;
private List mergeThreads = new ArrayList(); private List mergeThreads = new ArrayList();
@ -44,6 +42,7 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
private Directory dir; private Directory dir;
private boolean closed; private boolean closed;
private IndexWriter writer;
public ConcurrentMergeScheduler() { public ConcurrentMergeScheduler() {
if (allInstances != null) { if (allInstances != null) {
@ -94,7 +93,8 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
} }
private void message(String message) { private void message(String message) {
System.out.println("CMS [" + Thread.currentThread().getName() + "]: " + message); if (writer != null)
writer.message("CMS: " + message);
} }
private synchronized void initMergeThreadPriority() { private synchronized void initMergeThreadPriority() {
@ -110,11 +110,10 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
public synchronized void sync() { public synchronized void sync() {
while(mergeThreadCount() > 0) { while(mergeThreadCount() > 0) {
if (VERBOSE) { message("now wait for threads; currently " + mergeThreads.size() + " still running");
message("now wait for threads; currently " + mergeThreads.size() + " still running"); final int count = mergeThreads.size();
for(int i=0;i<mergeThreads.size();i++) for(int i=0;i<count;i++)
message(" " + i + ": " + ((MergeThread) mergeThreads.get(i))); message(" " + i + ": " + ((MergeThread) mergeThreads.get(i)));
}
try { try {
wait(); wait();
@ -129,6 +128,8 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
public void merge(IndexWriter writer) public void merge(IndexWriter writer)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
this.writer = writer;
initMergeThreadPriority(); initMergeThreadPriority();
dir = writer.getDirectory(); dir = writer.getDirectory();
@ -140,10 +141,8 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
// these newly proposed merges will likely already be // these newly proposed merges will likely already be
// registered. // registered.
if (VERBOSE) { message("now merge");
message("now merge"); message(" index: " + writer.segString());
message(" index: " + writer.segString());
}
// Iterate, pulling from the IndexWriter's queue of // Iterate, pulling from the IndexWriter's queue of
// pending merges, until its empty: // pending merges, until its empty:
@ -155,8 +154,7 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
MergePolicy.OneMerge merge = writer.getNextMerge(); MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) { if (merge == null) {
if (VERBOSE) message(" no more merges pending; now return");
message(" no more merges pending; now return");
return; return;
} }
@ -164,12 +162,10 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
// deterministic assignment of segment names // deterministic assignment of segment names
writer.mergeInit(merge); writer.mergeInit(merge);
if (VERBOSE) message(" consider merge " + merge.segString(dir));
message(" consider merge " + merge.segString(dir));
if (merge.isExternal) { if (merge.isExternal) {
if (VERBOSE) message(" merge involves segments from an external directory; now run in foreground");
message(" merge involves segments from an external directory; now run in foreground");
} else { } else {
synchronized(this) { synchronized(this) {
if (mergeThreadCount() < maxThreadCount) { if (mergeThreadCount() < maxThreadCount) {
@ -177,8 +173,7 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
// merge: // merge:
MergeThread merger = new MergeThread(writer, merge); MergeThread merger = new MergeThread(writer, merge);
mergeThreads.add(merger); mergeThreads.add(merger);
if (VERBOSE) message(" launch new thread [" + merger.getName() + "]");
message(" launch new thread [" + merger.getName() + "]");
try { try {
merger.setPriority(mergeThreadPriority); merger.setPriority(mergeThreadPriority);
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
@ -187,7 +182,7 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
} }
merger.start(); merger.start();
continue; continue;
} else if (VERBOSE) } else
message(" too many merge threads running; run merge in foreground"); message(" too many merge threads running; run merge in foreground");
} }
} }
@ -225,8 +220,7 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
try { try {
if (VERBOSE) message(" merge thread: start");
message(" merge thread: start");
while(true) { while(true) {
setRunningMerge(merge); setRunningMerge(merge);
@ -237,14 +231,12 @@ public class ConcurrentMergeScheduler implements MergeScheduler {
merge = writer.getNextMerge(); merge = writer.getNextMerge();
if (merge != null) { if (merge != null) {
writer.mergeInit(merge); writer.mergeInit(merge);
if (VERBOSE) message(" merge thread: do another merge " + merge.segString(dir));
message(" merge thread: do another merge " + merge.segString(dir));
} else } else
break; break;
} }
if (VERBOSE) message(" merge thread: done");
message(" merge thread: done");
} catch (Throwable exc) { } catch (Throwable exc) {
// When a merge was aborted & IndexWriter closed, // When a merge was aborted & IndexWriter closed,

View File

@ -304,8 +304,14 @@ public class IndexWriter {
} }
} }
private void message(String message) { /**
infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message); * Prints a message to the infoStream (if non-null),
* prefixed with the identifying information for this
* writer and the thread that's calling it.
*/
public void message(String message) {
if (infoStream != null)
infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message);
} }
private synchronized void setMessageID() { private synchronized void setMessageID() {

View File

@ -61,6 +61,12 @@ public abstract class LogMergePolicy implements MergePolicy {
private boolean useCompoundFile = true; private boolean useCompoundFile = true;
private boolean useCompoundDocStore = true; private boolean useCompoundDocStore = true;
private IndexWriter writer;
private void message(String message) {
if (writer != null)
writer.message("LMP: " + message);
}
/** <p>Returns the number of segments that are merged at /** <p>Returns the number of segments that are merged at
* once and also controls the total number of segments * once and also controls the total number of segments
@ -211,6 +217,8 @@ public abstract class LogMergePolicy implements MergePolicy {
public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException { public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
final int numSegments = infos.size(); final int numSegments = infos.size();
this.writer = writer;
message("findMerges: " + numSegments + " segments");
// Compute levels, which is just log (base mergeFactor) // Compute levels, which is just log (base mergeFactor)
// of the size of each segment // of the size of each segment
@ -284,6 +292,7 @@ public abstract class LogMergePolicy implements MergePolicy {
} }
upto--; upto--;
} }
message(" level " + levelBottom + " to " + maxLevel + ": " + (1+upto-start) + " segments");
// Finally, record all merges that are viable at this level: // Finally, record all merges that are viable at this level:
int end = start + mergeFactor; int end = start + mergeFactor;
@ -297,8 +306,11 @@ public abstract class LogMergePolicy implements MergePolicy {
if (!anyTooLarge) { if (!anyTooLarge) {
if (spec == null) if (spec == null)
spec = new MergeSpecification(); spec = new MergeSpecification();
message(" " + start + " to " + end + ": add this merge");
spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); spec.add(new OneMerge(infos.range(start, end), useCompoundFile));
} } else
message(" " + start + " to " + end + ": contains segment over maxMergeSize or maxMergeDocs; skipping");
start = end; start = end;
end = start + mergeFactor; end = start + mergeFactor;
} }