LUCENE-845, LUCENE-847, LUCENE-870: factor MergePolicy & MergeScheduler out of IndexWriter, improve overall concurrency of IndexWriter, and add ConcurrentMergeScheduler

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@576798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-09-18 09:27:14 +00:00
parent 40f0adb507
commit 299d6357dd
27 changed files with 3050 additions and 635 deletions

View File

@ -0,0 +1,277 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.Directory;
import java.io.IOException;
import java.util.List;
import java.util.LinkedList;
import java.util.ArrayList;
/** A {@link MergeScheduler} that runs each merge using a
* separate thread, up until a maximum number of threads
* ({@link #setMaxThreadCount}) at which points merges are
* run in the foreground, serially. This is a simple way
* to use concurrency in the indexing process without
* having to create and manage application level
* threads. */
public class ConcurrentMergeScheduler implements MergeScheduler {
public static boolean VERBOSE = false;
private int mergeThreadPriority = -1;
private List mergeThreads = new ArrayList();
private int maxThreadCount = 3;
private List exceptions = new ArrayList();
private Directory dir;
/** Sets the max # simultaneous threads that may be
* running. If a merge is necessary yet we already have
* this many threads running, the merge is returned back
* to IndexWriter so that it runs in the "foreground". */
public void setMaxThreadCount(int count) {
if (count < 1)
throw new IllegalArgumentException("count should be at least 1");
maxThreadCount = count;
}
/** Get the max # simultaneous threads that may be
* running. @see #setMaxThreadCount. */
public int getMaxThreadCount() {
return maxThreadCount;
}
/** Return the priority that merge threads run at. By
* default the priority is 1 plus the priority of (ie,
* slightly higher priority than) the first thread that
* calls merge. */
public synchronized int getMergeThreadPriority() {
initMergeThreadPriority();
return mergeThreadPriority;
}
/** Return the priority that merge threads run at. */
public synchronized void setMergeThreadPriority(int pri) {
mergeThreadPriority = pri;
final int numThreads = mergeThreads.size();
for(int i=0;i<numThreads;i++) {
MergeThread merge = (MergeThread) mergeThreads.get(i);
try {
merge.setPriority(pri);
} catch (NullPointerException npe) {
// Strangely, Sun's JDK 1.5 on Linux sometimes
// throws NPE out of here...
}
}
}
/** Returns any exceptions that were caught in the merge
* threads. */
public List getExceptions() {
return exceptions;
}
private void message(String message) {
System.out.println("CMS [" + Thread.currentThread().getName() + "]: " + message);
}
private synchronized void initMergeThreadPriority() {
if (mergeThreadPriority == -1)
// Default to slightly higher priority than our
// calling thread
mergeThreadPriority = 1+Thread.currentThread().getPriority();
}
public void close() {}
private synchronized void finishThreads() {
while(mergeThreads.size() > 0) {
if (VERBOSE) {
message("now wait for threads; currently " + mergeThreads.size() + " still running");
for(int i=0;i<mergeThreads.size();i++) {
final MergeThread mergeThread = ((MergeThread) mergeThreads.get(i));
message(" " + i + ": " + mergeThread.merge.segString(dir));
}
}
try {
wait();
} catch (InterruptedException e) {
}
}
}
public void sync() {
finishThreads();
}
// Used for testing
private boolean suppressExceptions;
/** Used for testing */
void setSuppressExceptions() {
suppressExceptions = true;
}
void clearSuppressExceptions() {
suppressExceptions = false;
}
public void merge(IndexWriter writer)
throws CorruptIndexException, IOException {
initMergeThreadPriority();
dir = writer.getDirectory();
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
if (VERBOSE) {
message("now merge");
message(" index: " + writer.segString());
}
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until its empty:
while(true) {
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
if (VERBOSE)
message(" no more merges pending; now return");
return;
}
// We do this w/ the primary thread to keep
// deterministic assignment of segment names
writer.mergeInit(merge);
if (VERBOSE)
message(" consider merge " + merge.segString(dir));
if (merge.isExternal) {
if (VERBOSE)
message(" merge involves segments from an external directory; now run in foreground");
} else {
synchronized(this) {
if (mergeThreads.size() < maxThreadCount) {
// OK to spawn a new merge thread to handle this
// merge:
MergeThread merger = new MergeThread(writer, merge);
mergeThreads.add(merger);
if (VERBOSE)
message(" launch new thread [" + merger.getName() + "]");
try {
merger.setPriority(mergeThreadPriority);
} catch (NullPointerException npe) {
// Strangely, Sun's JDK 1.5 on Linux sometimes
// throws NPE out of here...
}
merger.start();
continue;
} else if (VERBOSE)
message(" too many merge threads running; run merge in foreground");
}
}
// Too many merge threads already running, so we do
// this in the foreground of the calling thread
writer.merge(merge);
}
}
private class MergeThread extends Thread {
IndexWriter writer;
MergePolicy.OneMerge merge;
public MergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
this.writer = writer;
this.merge = merge;
}
public void run() {
try {
if (VERBOSE)
message(" merge thread: start");
// First time through the while loop we do the merge
// that we were started with:
MergePolicy.OneMerge merge = this.merge;
while(true) {
writer.merge(merge);
// Subsequent times through the loop we do any new
// merge that writer says is necessary:
merge = writer.getNextMerge();
if (merge != null) {
writer.mergeInit(merge);
if (VERBOSE)
message(" merge thread: do another merge " + merge.segString(dir));
} else
break;
}
if (VERBOSE)
message(" merge thread: done");
} catch (Throwable exc) {
// When a merge was aborted & IndexWriter closed,
// it's possible to get various IOExceptions,
// NullPointerExceptions, AlreadyClosedExceptions:
merge.setException(exc);
writer.addMergeException(merge);
if (!merge.isAborted()) {
// If the merge was not aborted then the exception
// is real
exceptions.add(exc);
if (!suppressExceptions)
// suppressExceptions is normally only set during
// testing.
throw new MergePolicy.MergeException(exc);
}
} finally {
synchronized(ConcurrentMergeScheduler.this) {
mergeThreads.remove(this);
ConcurrentMergeScheduler.this.notifyAll();
}
}
}
public String toString() {
return "merge thread: " + merge.segString(dir);
}
}
}

View File

@ -113,6 +113,7 @@ final class DocumentsWriter {
private int nextDocID; // Next docID to be added
private int numDocsInRAM; // # docs buffered in RAM
private int numDocsInStore; // # docs written to doc stores
private int nextWriteDocID; // Next docID to be written
// Max # ThreadState instances; if there are more threads
@ -238,6 +239,7 @@ final class DocumentsWriter {
String s = docStoreSegment;
docStoreSegment = null;
docStoreOffset = 0;
numDocsInStore = 0;
return s;
} else {
return null;
@ -245,6 +247,11 @@ final class DocumentsWriter {
}
private List files = null; // Cached list of files we've created
private List abortedFiles = null; // List of files that were written before last abort()
List abortedFiles() {
return abortedFiles;
}
/* Returns list of files in use by this instance,
* including any flushed segments. */
@ -278,6 +285,9 @@ final class DocumentsWriter {
* docs added since last flush. */
synchronized void abort() throws IOException {
if (infoStream != null)
infoStream.println("docWriter: now abort");
// Forcefully remove waiting ThreadStates from line
for(int i=0;i<numWaiting;i++)
waitingThreadStates[i].isIdle = true;
@ -290,6 +300,8 @@ final class DocumentsWriter {
try {
abortedFiles = files();
// Discard pending norms:
final int numField = fieldInfos.size();
for (int i=0;i<numField;i++) {
@ -332,6 +344,7 @@ final class DocumentsWriter {
}
files = null;
} finally {
resumeAllThreads();
}
@ -398,7 +411,7 @@ final class DocumentsWriter {
newFiles = new ArrayList();
docStoreOffset += numDocsInRAM;
docStoreOffset = numDocsInStore;
if (closeDocStore) {
assert docStoreSegment != null;
@ -2119,6 +2132,7 @@ final class DocumentsWriter {
segment = writer.newSegmentName();
numDocsInRAM++;
numDocsInStore++;
// We must at this point commit to flushing to ensure we
// always get N docs when we flush by doc count, even if

View File

@ -105,7 +105,7 @@ final class IndexFileDeleter {
}
private void message(String message) {
infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message);
infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message);
}
/**
@ -275,25 +275,59 @@ final class IndexFileDeleter {
* Writer calls this when it has hit an error and had to
* roll back, to tell us that there may now be
* unreferenced files in the filesystem. So we re-list
* the filesystem and delete such files:
* the filesystem and delete such files. If segmentName
* is non-null, we will only delete files corresponding to
* that segment.
*/
public void refresh() throws IOException {
public void refresh(String segmentName) throws IOException {
String[] files = directory.list();
if (files == null)
throw new IOException("cannot read directory " + directory + ": list() returned null");
IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
String segmentPrefix1;
String segmentPrefix2;
if (segmentName != null) {
segmentPrefix1 = segmentName + ".";
segmentPrefix2 = segmentName + "_";
} else {
segmentPrefix1 = null;
segmentPrefix2 = null;
}
for(int i=0;i<files.length;i++) {
String fileName = files[i];
if (filter.accept(null, fileName) && !refCounts.containsKey(fileName) && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
if (filter.accept(null, fileName) &&
(segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
!refCounts.containsKey(fileName) &&
!fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
// Unreferenced file, so remove it
if (infoStream != null) {
message("refresh: removing newly created unreferenced file \"" + fileName + "\"");
message("refresh [prefix=" + segmentName + "]: removing newly created unreferenced file \"" + fileName + "\"");
}
deleteFile(fileName);
}
}
}
public void refresh() throws IOException {
refresh(null);
}
public void close() throws IOException {
deletePendingFiles();
}
private void deletePendingFiles() throws IOException {
if (deletable != null) {
List oldDeletable = deletable;
deletable = null;
int size = oldDeletable.size();
for(int i=0;i<size;i++) {
deleteFile((String) oldDeletable.get(i));
}
}
}
/**
* For definition of "check point" see IndexWriter comments:
* "Clarification: Check Points (and commits)".
@ -322,19 +356,17 @@ final class IndexFileDeleter {
// Try again now to delete any previously un-deletable
// files (because they were in use, on Windows):
if (deletable != null) {
List oldDeletable = deletable;
deletable = null;
int size = oldDeletable.size();
for(int i=0;i<size;i++) {
deleteFile((String) oldDeletable.get(i));
}
}
deletePendingFiles();
// Incref the files:
incRef(segmentInfos, isCommit);
if (docWriter != null)
incRef(docWriter.files());
final List docWriterFiles;
if (docWriter != null) {
docWriterFiles = docWriter.files();
if (docWriterFiles != null)
incRef(docWriterFiles);
} else
docWriterFiles = null;
if (isCommit) {
// Append to our commits list:
@ -364,9 +396,9 @@ final class IndexFileDeleter {
lastFiles.add(segmentInfo.files());
}
}
if (docWriter != null)
lastFiles.add(docWriter.files());
}
if (docWriterFiles != null)
lastFiles.add(docWriterFiles);
}
void incRef(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
@ -385,7 +417,7 @@ final class IndexFileDeleter {
}
}
private void incRef(List files) throws IOException {
void incRef(List files) throws IOException {
int size = files.size();
for(int i=0;i<size;i++) {
String fileName = (String) files.get(i);
@ -397,7 +429,7 @@ final class IndexFileDeleter {
}
}
private void decRef(List files) throws IOException {
void decRef(List files) throws IOException {
int size = files.size();
for(int i=0;i<size;i++) {
decRef((String) files.get(i));
@ -438,7 +470,22 @@ final class IndexFileDeleter {
return rc;
}
private void deleteFile(String fileName)
void deleteFiles(List files) throws IOException {
final int size = files.size();
for(int i=0;i<size;i++)
deleteFile((String) files.get(i));
}
/** Delets the specified files, but only if they are new
* (have not yet been incref'd). */
void deleteNewFiles(List files) throws IOException {
final int size = files.size();
for(int i=0;i<size;i++)
if (!refCounts.containsKey(files.get(i)))
deleteFile((String) files.get(i));
}
void deleteFile(String fileName)
throws IOException {
try {
if (infoStream != null) {
@ -490,11 +537,12 @@ final class IndexFileDeleter {
int count;
final private int IncRef() {
final public int IncRef() {
return ++count;
}
final private int DecRef() {
final public int DecRef() {
assert count > 0;
return --count;
}
}

View File

@ -202,6 +202,10 @@ public class IndexModifier {
indexReader = null;
}
indexWriter = new IndexWriter(directory, analyzer, false);
// IndexModifier cannot use ConcurrentMergeScheduler
// because it synchronizes on the directory which can
// cause deadlock
indexWriter.setMergeScheduler(new SerialMergeScheduler());
indexWriter.setInfoStream(infoStream);
indexWriter.setUseCompoundFile(useCompoundFile);
if (maxBufferedDocs != 0)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,75 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/** This is a {@link LogMergePolicy} that measures size of a
* segment as the total byte size of the segment's files. */
public class LogByteSizeMergePolicy extends LogMergePolicy {
/** Default minimum segment size. @see setMinMergeMB */
public static final double DEFAULT_MIN_MERGE_MB = 1.6;
/** Default maximum segment size. A segment of this size
* or larger will never be merged. @see setMaxMergeMB */
public static final double DEFAULT_MAX_MERGE_MB = (double) Long.MAX_VALUE;
public LogByteSizeMergePolicy() {
super();
minMergeSize = (long) (DEFAULT_MIN_MERGE_MB*1024*1024);
maxMergeSize = (long) (DEFAULT_MAX_MERGE_MB*1024*1024);
}
protected long size(SegmentInfo info) throws IOException {
return info.sizeInBytes();
}
/** Sets the maximum size for a segment to be merged.
* When a segment is this size or larger it will never be
* merged. */
public void setMaxMergeMB(double mb) {
maxMergeSize = (long) (mb*1024*1024);
}
/** Get the maximum size for a segment to be merged.
* @see #setMaxMergeMB */
public double getMaxMergeMB() {
return ((double) maxMergeSize)/1024/1024;
}
/** Sets the minimum size for the lowest level segments.
* Any segments below this size are considered to be on
* the same level (even if they vary drastically in size)
* and will be merged whenever there are mergeFactor of
* them. This effectively truncates the "long tail" of
* small segments that would otherwise be created into a
* single level. If you set this too large, it could
* greatly increase the merging cost during indexing (if
* you flush many small segments). */
public void setMinMergeMB(double mb) {
minMergeSize = (long) (mb*1024*1024);
}
/** Get the minimum size for a segment to remain
* un-merged.
* @see #setMinMergeMB **/
public double getMinMergeMB() {
return ((double) minMergeSize)/1024/1024;
}
}

View File

@ -0,0 +1,75 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** This is a {@link LogMergePolicy} that measures size of a
* segment as the number of documents (not taking deletions
* into account). */
public class LogDocMergePolicy extends LogMergePolicy {
/** Default minimum segment size. @see setMinMergeDocs */
public static final int DEFAULT_MIN_MERGE_DOCS = 1000;
/** Default maximum segment size. A segment of this size
* or larger will never be merged. @see setMaxMergeDocs */
public static final int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE;
public LogDocMergePolicy() {
super();
minMergeSize = DEFAULT_MIN_MERGE_DOCS;
maxMergeSize = DEFAULT_MAX_MERGE_DOCS;
}
protected long size(SegmentInfo info) {
return info.docCount;
}
/** Sets the maximum size for a segment to be merged.
* When a segment is this size or larger it will never be
* merged. */
public void setMaxMergeDocs(int maxMergeDocs) {
maxMergeSize = maxMergeDocs;
}
/** Get the maximum size for a segment to be merged.
* @see #setMaxMergeDocs */
public int getMaxMergeDocs() {
return (int) maxMergeSize;
}
/** Sets the minimum size for the lowest level segments.
* Any segments below this size are considered to be on
* the same level (even if they vary drastically in size)
* and will be merged whenever there are mergeFactor of
* them. This effectively truncates the "long tail" of
* small segments that would otherwise be created into a
* single level. If you set this too large, it could
* greatly increase the merging cost during indexing (if
* you flush many small segments). */
public void setMinMergeDocs(int minMergeDocs) {
minMergeSize = minMergeDocs;
}
/** Get the minimum size for a segment to remain
* un-merged.
* @see #setMinMergeDocs **/
public int getMinMergeDocs() {
return (int) minMergeSize;
}
}

View File

@ -0,0 +1,303 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import org.apache.lucene.store.Directory;
/** <p>This class implements a {@link MergePolicy} that tries
* to merge segments into levels of exponentially
* increasing size, where each level has < mergeFactor
* segments in it. Whenever a given levle has mergeFactor
* segments or more in it, they will be merged.</p>
*
* <p>This class is abstract and requires a subclass to
* define the {@link #size} method which specifies how a
* segment's size is determined. {@link LogDocMergePolicy}
* is one subclass that measures size by document count in
* the segment. {@link LogByteSizeMergePolicy} is another
* subclass that measures size as the total byte size of the
* file(s) for the segment.</p>
*/
public abstract class LogMergePolicy implements MergePolicy {
/** Defines the allowed range of log(size) for each
* level. A level is computed by taking the max segment
* log size, minuse LEVEL_LOG_SPAN, and finding all
* segments falling within that range. */
public static final double LEVEL_LOG_SPAN = 0.75;
/** Default merge factor, which is how many segments are
* merged at a time */
public static final int DEFAULT_MERGE_FACTOR = 10;
private int mergeFactor = DEFAULT_MERGE_FACTOR;
long minMergeSize;
long maxMergeSize;
private boolean useCompoundFile = true;
private boolean useCompoundDocStore = true;
/** <p>Returns the number of segments that are merged at
* once and also controls the total number of segments
* allowed to accumulate in the index.</p> */
public int getMergeFactor() {
return mergeFactor;
}
/** Determines how often segment indices are merged by
* addDocument(). With smaller values, less RAM is used
* while indexing, and searches on unoptimized indices are
* faster, but indexing speed is slower. With larger
* values, more RAM is used during indexing, and while
* searches on unoptimized indices are slower, indexing is
* faster. Thus larger values (> 10) are best for batch
* index creation, and smaller values (< 10) for indices
* that are interactively maintained. */
public void setMergeFactor(int mergeFactor) {
if (mergeFactor < 2)
throw new IllegalArgumentException("mergeFactor cannot be less than 2");
this.mergeFactor = mergeFactor;
}
// Javadoc inherited
public boolean useCompoundFile(SegmentInfos infos, SegmentInfo info) {
return useCompoundFile;
}
/** Sets whether compound file format should be used for
* newly flushed and newly merged segments. */
public void setUseCompoundFile(boolean useCompoundFile) {
this.useCompoundFile = useCompoundFile;
}
/** Returns true if newly flushed and newly merge segments
* are written in compound file format. @see
* #setUseCompoundFile */
public boolean getUseCompoundFile() {
return useCompoundFile;
}
// Javadoc inherited
public boolean useCompoundDocStore(SegmentInfos infos) {
return useCompoundDocStore;
}
/** Sets whether compound file format should be used for
* newly flushed and newly merged doc store
* segment files (term vectors and stored fields). */
public void setUseCompoundDocStore(boolean useCompoundDocStore) {
this.useCompoundDocStore = useCompoundDocStore;
}
/** Returns true if newly flushed and newly merge doc
* store segment files (term vectors and stored fields)
* are written in compound file format. @see
* #setUseCompoundDocStore */
public boolean getUseCompoundDocStore() {
return useCompoundDocStore;
}
public void close() {}
abstract protected long size(SegmentInfo info) throws IOException;
private boolean isOptimized(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException {
final int numSegments = infos.size();
int numToOptimize = 0;
SegmentInfo optimizeInfo = null;
for(int i=0;i<numSegments && numToOptimize <= maxNumSegments;i++) {
final SegmentInfo info = infos.info(i);
if (segmentsToOptimize.contains(info)) {
numToOptimize++;
optimizeInfo = info;
}
}
return numToOptimize <= maxNumSegments &&
(numToOptimize != 1 || isOptimized(writer, optimizeInfo));
}
/** Returns true if this single nfo is optimized (has no
* pending norms or deletes, is in the same dir as the
* writer, and matches the current compound file setting */
private boolean isOptimized(IndexWriter writer, SegmentInfo info)
throws IOException {
return !info.hasDeletions() &&
!info.hasSeparateNorms() &&
info.dir == writer.getDirectory() &&
info.getUseCompoundFile() == useCompoundFile;
}
/** Returns the merges necessary to optimize the index.
* This merge policy defines "optimized" to mean only one
* segment in the index, where that segment has no
* deletions pending nor separate norms, and it is in
* compound file format if the current useCompoundFile
* setting is true. This method returns multiple merges
* (mergeFactor at a time) so the {@link MergeScheduler}
* in use may make use of concurrency. */
public MergeSpecification findMergesForOptimize(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException {
final Directory dir = writer.getDirectory();
MergeSpecification spec;
if (!isOptimized(infos, writer, maxNumSegments, segmentsToOptimize)) {
int numSegments = infos.size();
while(numSegments > 0) {
final SegmentInfo info = infos.info(--numSegments);
if (segmentsToOptimize.contains(info)) {
numSegments++;
break;
}
}
if (numSegments > 0) {
spec = new MergeSpecification();
while (numSegments > 0) {
final int first;
if (numSegments > mergeFactor)
first = numSegments-mergeFactor;
else
first = 0;
if (numSegments > 1 || !isOptimized(writer, infos.info(0)))
spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile));
numSegments -= mergeFactor;
}
} else
spec = null;
} else
spec = null;
return spec;
}
/** Checks if any merges are now necessary and returns a
* {@link MergePolicy.MergeSpecification} if so. A merge
* is necessary when there are more than {@link
* #setMergeFactor} segments at a given level. When
* multiple levels have too many segments, this method
* will return multiple merges, allowing the {@link
* MergeScheduler} to use concurrency. */
public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
final int numSegments = infos.size();
// Compute levels, which is just log (base mergeFactor)
// of the size of each segment
float[] levels = new float[numSegments];
final float norm = (float) Math.log(mergeFactor);
final Directory directory = writer.getDirectory();
for(int i=0;i<numSegments;i++) {
final SegmentInfo info = infos.info(i);
long size = size(info);
// Refuse to import a segment that's too large
if (size >= maxMergeSize && info.dir != directory)
throw new IllegalArgumentException("Segment is too large (" + size + " vs max size " + maxMergeSize + ")");
// Floor tiny segments
if (size < 1)
size = 1;
levels[i] = (float) Math.log(size)/norm;
}
final float levelFloor;
if (minMergeSize <= 0)
levelFloor = (float) 0.0;
else
levelFloor = (float) (Math.log(minMergeSize)/norm);
// Now, we quantize the log values into levels. The
// first level is any segment whose log size is within
// LEVEL_LOG_SPAN of the max size, or, who has such as
// segment "to the right". Then, we find the max of all
// other segments and use that to define the next level
// segment, etc.
MergeSpecification spec = null;
int start = 0;
while(start < numSegments) {
// Find max level of all segments not already
// quantized.
float maxLevel = levels[start];
for(int i=1+start;i<numSegments;i++) {
final float level = levels[i];
if (level > maxLevel)
maxLevel = level;
}
// Now search backwards for the rightmost segment that
// falls into this level:
float levelBottom;
if (maxLevel < levelFloor)
// All remaining segments fall into the min level
levelBottom = -1.0F;
else {
levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN);
// Force a boundary at the level floor
if (levelBottom < levelFloor && maxLevel >= levelFloor)
levelBottom = levelFloor;
}
int upto = numSegments-1;
while(upto >= start) {
if (levels[upto] >= levelBottom) {
break;
}
upto--;
}
// Finally, record all merges that are viable at this level:
int end = start + mergeFactor;
while(end <= 1+upto) {
boolean anyTooLarge = false;
for(int i=start;i<end;i++)
anyTooLarge |= size(infos.info(i)) >= maxMergeSize;
if (!anyTooLarge) {
if (spec == null)
spec = new MergeSpecification();
spec.add(new OneMerge(infos.range(start, end), useCompoundFile));
}
start = end;
end = start + mergeFactor;
}
start = 1+upto;
}
return spec;
}
}

View File

@ -0,0 +1,215 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.store.Directory;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
/**
* <p>Expert: a MergePolicy determines the sequence of
* primitive merge operations to be used for overall merge
* and optimize operations.</p>
*
* <p>Whenever the segments in an index have been altered by
* {@link IndexWriter}, either the addition of a newly
* flushed segment, addition of many segments from
* addIndexes* calls, or a previous merge that may now need
* to cascade, {@link IndexWriter} invokes {@link
* #findMerges} to give the MergePolicy a chance to pick
* merges that are now required. This method returns a
* {@link MergeSpecification} instance describing the set of
* merges that should be done, or null if no merges are
* necessary. When IndexWriter.optimize is called, it calls
* {@link #findMergesForOptimize} and the MergePolicy should
* then return the necessary merges.</p>
*
* <p>Note that the policy can return more than one merge at
* a time. In this case, if the writer is using {@link
* SerialMergeScheduler}, the merges will be run
* sequentially but if it is using {@link
* ConcurrentMergeScheduler} they will be run concurrently.</p>
*
* <p>The default MergePolicy is {@link
* LogByteSizeMergePolicy}.</p>
*/
public interface MergePolicy {
/** OneMerge provides the information necessary to perform
* an individual primitive merge operation, resulting in
* a single new segment. The merge spec includes the
* subset of segments to be merged as well as whether the
* new segment should use the compound file format. */
public static class OneMerge {
SegmentInfo info; // used by IndexWriter
boolean mergeDocStores; // used by IndexWriter
boolean optimize; // used by IndexWriter
SegmentInfos segmentsClone; // used by IndexWriter
boolean increfDone; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
boolean isExternal; // used by IndexWriter
final SegmentInfos segments;
final boolean useCompoundFile;
boolean aborted;
Throwable error;
public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
if (0 == segments.size())
throw new RuntimeException("segments must include at least one segment");
this.segments = segments;
this.useCompoundFile = useCompoundFile;
}
/** Record that an exception occurred while executing
* this merge */
public synchronized void setException(Throwable error) {
this.error = error;
}
/** Retrieve previous exception set by {@link
* #setException}. */
public synchronized Throwable getException() {
return error;
}
/** Mark this merge as aborted. If this is called
* before the merge is committed then the merge will
* not be committed. */
public synchronized void abort() {
aborted = true;
}
/** Returns true if this merge was aborted. */
public synchronized boolean isAborted() {
return aborted;
}
public String segString(Directory dir) {
StringBuffer b = new StringBuffer();
final int numSegments = segments.size();
for(int i=0;i<numSegments;i++) {
if (i > 0) b.append(" ");
b.append(segments.info(i).segString(dir));
}
if (info != null)
b.append(" into " + info.name);
if (optimize)
b.append(" [optimize]");
return b.toString();
}
}
/**
* A MergeSpecification instance provides the information
* necessary to perform multiple merges. It simply
* contains a list of {@link OneMerge} instances.
*/
public static class MergeSpecification implements Cloneable {
/**
* The subset of segments to be included in the primitive merge.
*/
public List merges = new ArrayList();
public void add(OneMerge merge) {
merges.add(merge);
}
public String segString(Directory dir) {
StringBuffer b = new StringBuffer();
b.append("MergeSpec:\n");
final int count = merges.size();
for(int i=0;i<count;i++)
b.append(" " + (1+i) + ": " + ((OneMerge) merges.get(i)).segString(dir));
return b.toString();
}
}
/** Exception thrown if there are any problems while
* executing a merge. */
public class MergeException extends RuntimeException {
public MergeException(String message) {
super(message);
}
public MergeException(Throwable exc) {
super(exc);
}
}
/**
* Determine what set of merge operations are now
* necessary on the index. The IndexWriter calls this
* whenever there is a change to the segments. This call
* is always synchronized on the IndexWriter instance so
* only one thread at a time will call this method.
*
* @param segmentInfos the total set of segments in the index
* @param writer IndexWriter instance
*/
MergeSpecification findMerges(SegmentInfos segmentInfos,
IndexWriter writer)
throws CorruptIndexException, IOException;
/**
* Determine what set of merge operations are necessary in
* order to optimize the index. The IndexWriter calls
* this when its optimize() method is called. This call
* is always synchronized on the IndexWriter instance so
* only one thread at a time will call this method.
*
* @param segmentInfos the total set of segments in the index
* @param writer IndexWriter instance
* @param maxSegmentCount requested maximum number of
* segments in the index (currently this is always 1)
* @param segmentsToOptimize contains the specific
* SegmentInfo instances that must be merged away. This
* may be a subset of all SegmentInfos.
*/
MergeSpecification findMergesForOptimize(SegmentInfos segmentInfos,
IndexWriter writer,
int maxSegmentCount,
Set segmentsToOptimize)
throws CorruptIndexException, IOException;
/**
* Release all resources for the policy.
*/
void close();
/**
* Returns true if a newly flushed (not from merge)
* segment should use the compound file format.
*/
boolean useCompoundFile(SegmentInfos segments, SegmentInfo newSegment);
/**
* Returns true if the doc store files should use the
* compound file format.
*/
boolean useCompoundDocStore(SegmentInfos segments);
}

View File

@ -0,0 +1,36 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
/** Expert: {@link IndexWriter} uses an instance
* implementing this interface to execute the merges
* selected by a {@link MergePolicy}. The default
* MergeScheduler is {@link SerialMergeScheduler}. */
public interface MergeScheduler {
/** Run the merges provided by {@link IndexWriter#getNextMerge()}. */
void merge(IndexWriter writer)
throws CorruptIndexException, IOException;
/** Close this MergeScheduler. */
void close()
throws CorruptIndexException, IOException;
}

View File

@ -54,7 +54,7 @@ public class PositionBasedTermVectorMapper extends TermVectorMapper{
/**
* Never ignores positions. This mapper doesn't make much sense unless there are positions
* @return
* @return false
*/
public boolean isIgnoringPositions() {
return false;

View File

@ -65,6 +65,8 @@ final class SegmentInfo {
private List files; // cached list of files that this segment uses
// in the Directory
long sizeInBytes = -1; // total byte size of all of our files (computed on demand)
private int docStoreOffset; // if this segment shares stored fields & vectors, this
// offset is where in that file this segment's docs begin
private String docStoreSegment; // name used to derive fields/vectors file we share with
@ -104,7 +106,7 @@ final class SegmentInfo {
* Copy everything from src SegmentInfo into our instance.
*/
void reset(SegmentInfo src) {
files = null;
clearFiles();
name = src.name;
docCount = src.docCount;
dir = src.dir;
@ -199,6 +201,19 @@ final class SegmentInfo {
}
}
/** Returns total size in bytes of all of files used by
* this segment. */
long sizeInBytes() throws IOException {
if (sizeInBytes == -1) {
List files = files();
final int size = files.size();
sizeInBytes = 0;
for(int i=0;i<size;i++)
sizeInBytes += dir.fileLength((String) files.get(i));
}
return sizeInBytes;
}
boolean hasDeletions()
throws IOException {
// Cases:
@ -231,12 +246,12 @@ final class SegmentInfo {
} else {
delGen++;
}
files = null;
clearFiles();
}
void clearDelGen() {
delGen = NO;
files = null;
clearFiles();
}
public Object clone () {
@ -345,7 +360,7 @@ final class SegmentInfo {
} else {
normGen[fieldIndex]++;
}
files = null;
clearFiles();
}
/**
@ -392,7 +407,7 @@ final class SegmentInfo {
} else {
this.isCompoundFile = NO;
}
files = null;
clearFiles();
}
/**
@ -419,7 +434,7 @@ final class SegmentInfo {
void setDocStoreIsCompoundFile(boolean v) {
docStoreIsCompoundFile = v;
files = null;
clearFiles();
}
String getDocStoreSegment() {
@ -428,7 +443,7 @@ final class SegmentInfo {
void setDocStoreOffset(int offset) {
docStoreOffset = offset;
files = null;
clearFiles();
}
/**
@ -561,4 +576,52 @@ final class SegmentInfo {
}
return files;
}
/* Called whenever any change is made that affects which
* files this segment has. */
private void clearFiles() {
files = null;
sizeInBytes = -1;
}
/** Used for debugging */
public String segString(Directory dir) {
String cfs;
try {
if (getUseCompoundFile())
cfs = "c";
else
cfs = "C";
} catch (IOException ioe) {
cfs = "?";
}
String docStore;
if (docStoreOffset != -1)
docStore = "->" + docStoreSegment;
else
docStore = "";
return name + ":" +
cfs +
(this.dir == dir ? "" : "x") +
docCount + docStore;
}
/** We consider another SegmentInfo instance equal if it
* has the same dir and same name. */
public boolean equals(Object obj) {
SegmentInfo other;
try {
other = (SegmentInfo) obj;
} catch (ClassCastException cce) {
return false;
}
return other.dir == dir && other.name.equals(name);
}
public int hashCode() {
return dir.hashCode() + name.hashCode();
}
}

View File

@ -330,6 +330,9 @@ final class SegmentInfos extends Vector {
public long getGeneration() {
return generation;
}
public long getLastGeneration() {
return lastGeneration;
}
/**
* Current version number from segments file.
@ -661,4 +664,16 @@ final class SegmentInfos extends Vector {
*/
protected abstract Object doBody(String segmentFileName) throws CorruptIndexException, IOException;
}
/**
* Returns a new SegmentInfos containg the SegmentInfo
* instances in the specified range first (inclusive) to
* last (exclusive), so total number of segments returned
* is last-first.
*/
public SegmentInfos range(int first, int last) {
SegmentInfos infos = new SegmentInfos();
infos.addAll(super.subList(first, last));
return infos;
}
}

View File

@ -0,0 +1,42 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.LinkedList;
/** A {@link MergeScheduler} that simply does each merge
* sequentially, using the current thread. */
public class SerialMergeScheduler implements MergeScheduler {
/** Just do the merges in sequence. We do this
* "synchronized" so that even if the application is using
* multiple threads, only one merge may run at a time. */
synchronized public void merge(IndexWriter writer)
throws CorruptIndexException, IOException {
while(true) {
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null)
break;
writer.merge(merge);
}
}
public void close() {}
}

View File

@ -236,7 +236,7 @@ class DocHelper {
//writer.setUseCompoundFile(false);
writer.addDocument(doc);
writer.flush();
SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
SegmentInfo info = writer.newestSegment();
writer.close();
return info;
}

View File

@ -272,7 +272,6 @@ public class TestAddIndexesNoOptimize extends TestCase {
writer.addIndexesNoOptimize(new Directory[] { aux, aux });
assertEquals(1020, writer.docCount());
assertEquals(2, writer.getSegmentCount());
assertEquals(1000, writer.getDocCount(0));
writer.close();
@ -373,7 +372,7 @@ public class TestAddIndexesNoOptimize extends TestCase {
writer = newWriter(dir, true);
writer.setMaxBufferedDocs(1000);
// add 1000 documents
// add 1000 documents in 1 segment
addDocs(writer, 1000);
assertEquals(1000, writer.docCount());
assertEquals(1, writer.getSegmentCount());

View File

@ -0,0 +1,231 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.analysis.SimpleAnalyzer;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.util._TestUtil;
import org.apache.lucene.util.English;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.File;
public class TestConcurrentMergeScheduler extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static class FailOnlyOnFlush extends MockRAMDirectory.Failure {
boolean doFail = false;
public void setDoFail() {
this.doFail = true;
}
public void clearDoFail() {
this.doFail = false;
}
public void eval(MockRAMDirectory dir) throws IOException {
if (doFail) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("doFlush".equals(trace[i].getMethodName())) {
//new RuntimeException().printStackTrace(System.out);
throw new IOException("now failing during flush");
}
}
}
}
}
// Make sure running BG merges still work fine even when
// we are hitting exceptions during flushing.
public void testFlushExceptions() throws IOException {
MockRAMDirectory directory = new MockRAMDirectory();
FailOnlyOnFlush failure = new FailOnlyOnFlush();
directory.failOn(failure);
IndexWriter writer = new IndexWriter(directory, ANALYZER, true);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(2);
Document doc = new Document();
Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED);
doc.add(idField);
for(int i=0;i<10;i++) {
for(int j=0;j<20;j++) {
idField.setValue(Integer.toString(i*20+j));
writer.addDocument(doc);
}
// Even though this won't delete any docs,
// IndexWriter's flush will still make a clone for all
// SegmentInfos on hitting the exception:
writer.deleteDocuments(new Term("id", "1000"));
failure.setDoFail();
try {
writer.flush();
fail("failed to hit IOException");
} catch (IOException ioe) {
failure.clearDoFail();
}
}
assertEquals(0, cms.getExceptions().size());
writer.close();
IndexReader reader = IndexReader.open(directory);
assertEquals(200, reader.numDocs());
reader.close();
directory.close();
}
// Test that deletes committed after a merge started and
// before it finishes, are correctly merged back:
public void testDeleteMerging() throws IOException {
RAMDirectory directory = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(directory, ANALYZER, true);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writer.setMergeScheduler(cms);
// Force degenerate merging so we can get a mix of
// merging of segments with and without deletes at the
// start:
((LogDocMergePolicy) writer.getMergePolicy()).setMinMergeDocs(1000);
Document doc = new Document();
Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED);
doc.add(idField);
for(int i=0;i<10;i++) {
for(int j=0;j<100;j++) {
idField.setValue(Integer.toString(i*100+j));
writer.addDocument(doc);
}
int delID = i;
while(delID < 100*(1+i)) {
writer.deleteDocuments(new Term("id", ""+delID));
delID += 10;
}
writer.flush();
}
assertEquals(0, cms.getExceptions().size());
writer.close();
IndexReader reader = IndexReader.open(directory);
// Verify that we did not lose any deletes...
assertEquals(450, reader.numDocs());
reader.close();
directory.close();
}
public void testNoExtraFiles() throws IOException {
RAMDirectory directory = new MockRAMDirectory();
for(int pass=0;pass<2;pass++) {
boolean autoCommit = pass==0;
IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true);
for(int iter=0;iter<7;iter++) {
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(2);
for(int j=0;j<21;j++) {
Document doc = new Document();
doc.add(new Field("content", "a b c", Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(doc);
}
writer.close();
TestIndexWriter.assertNoUnreferencedFiles(directory, "testNoExtraFiles autoCommit=" + autoCommit);
assertEquals(0, cms.getExceptions().size());
// Reopen
writer = new IndexWriter(directory, autoCommit, ANALYZER, false);
}
writer.close();
}
directory.close();
}
public void testNoWaitClose() throws IOException {
RAMDirectory directory = new MockRAMDirectory();
Document doc = new Document();
Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED);
doc.add(idField);
for(int pass=0;pass<2;pass++) {
boolean autoCommit = pass==0;
IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true);
for(int iter=0;iter<10;iter++) {
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
writer.setMergeScheduler(cms);
writer.setMaxBufferedDocs(2);
for(int j=0;j<201;j++) {
idField.setValue(Integer.toString(iter*201+j));
writer.addDocument(doc);
}
int delID = iter*201;
for(int j=0;j<20;j++) {
writer.deleteDocuments(new Term("id", Integer.toString(delID)));
delID += 5;
}
writer.close(false);
assertEquals(0, cms.getExceptions().size());
IndexReader reader = IndexReader.open(directory);
assertEquals((1+iter)*181, reader.numDocs());
reader.close();
// Reopen
writer = new IndexWriter(directory, autoCommit, ANALYZER, false);
}
writer.close();
}
try {
directory.close();
} catch (RuntimeException ioe) {
// MockRAMDirectory will throw IOExceptions when there
// are still open files, which is OK since some merge
// threads may still be running at this point.
}
}
}

View File

@ -168,7 +168,7 @@ public class TestDoc extends TestCase {
Document doc = FileDocument.Document(file);
writer.addDocument(doc);
writer.flush();
return writer.segmentInfos.info(writer.segmentInfos.size()-1);
return writer.newestSegment();
}

View File

@ -62,7 +62,7 @@ public class TestDocumentWriter extends TestCase {
IndexWriter writer = new IndexWriter(dir, analyzer, true);
writer.addDocument(testDoc);
writer.flush();
SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
SegmentInfo info = writer.newestSegment();
writer.close();
//After adding the document, we should be able to read it back in
SegmentReader reader = SegmentReader.get(info);
@ -123,7 +123,7 @@ public class TestDocumentWriter extends TestCase {
writer.addDocument(doc);
writer.flush();
SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);
@ -156,7 +156,7 @@ public class TestDocumentWriter extends TestCase {
writer.addDocument(doc);
writer.flush();
SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);

View File

@ -39,6 +39,7 @@ import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util._TestUtil;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.store.LockFactory;
@ -134,7 +135,6 @@ public class TestIndexWriter extends TestCase
*/
public void testAddIndexOnDiskFull() throws IOException
{
int START_COUNT = 57;
int NUM_DIR = 50;
int END_COUNT = START_COUNT + NUM_DIR*25;
@ -200,6 +200,9 @@ public class TestIndexWriter extends TestCase
for(int iter=0;iter<6;iter++) {
if (debug)
System.out.println("TEST: iter=" + iter);
// Start with 100 bytes more than we are currently using:
long diskFree = diskUsage+100;
@ -229,7 +232,16 @@ public class TestIndexWriter extends TestCase
writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), false);
IOException err = null;
MergeScheduler ms = writer.getMergeScheduler();
for(int x=0;x<2;x++) {
if (ms instanceof ConcurrentMergeScheduler)
// This test intentionally produces exceptions
// in the threads that CMS launches; we don't
// want to pollute test output with these.
if (0 == x)
((ConcurrentMergeScheduler) ms).setSuppressExceptions();
else
((ConcurrentMergeScheduler) ms).clearSuppressExceptions();
// Two loops: first time, limit disk space &
// throw random IOExceptions; second time, no
@ -301,7 +313,7 @@ public class TestIndexWriter extends TestCase
err = e;
if (debug) {
System.out.println(" hit IOException: " + e);
// e.printStackTrace(System.out);
e.printStackTrace(System.out);
}
if (1 == x) {
@ -310,6 +322,10 @@ public class TestIndexWriter extends TestCase
}
}
// Make sure all threads from
// ConcurrentMergeScheduler are done
_TestUtil.syncConcurrentMerges(writer);
if (autoCommit) {
// Whether we succeeded or failed, check that
@ -411,6 +427,12 @@ public class TestIndexWriter extends TestCase
}
writer.close();
// Wait for all BG threads to finish else
// dir.close() will throw IOException because
// there are still open files
_TestUtil.syncConcurrentMerges(ms);
dir.close();
// Try again with 2000 more bytes of free space:
@ -427,21 +449,38 @@ public class TestIndexWriter extends TestCase
*/
public void testAddDocumentOnDiskFull() throws IOException {
boolean debug = false;
for(int pass=0;pass<3;pass++) {
if (debug)
System.out.println("TEST: pass=" + pass);
boolean autoCommit = pass == 0;
boolean doAbort = pass == 2;
long diskFree = 200;
while(true) {
if (debug)
System.out.println("TEST: cycle: diskFree=" + diskFree);
MockRAMDirectory dir = new MockRAMDirectory();
dir.setMaxSizeInBytes(diskFree);
IndexWriter writer = new IndexWriter(dir, autoCommit, new WhitespaceAnalyzer(), true);
MergeScheduler ms = writer.getMergeScheduler();
if (ms instanceof ConcurrentMergeScheduler)
// This test intentionally produces exceptions
// in the threads that CMS launches; we don't
// want to pollute test output with these.
((ConcurrentMergeScheduler) ms).setSuppressExceptions();
boolean hitError = false;
try {
for(int i=0;i<200;i++) {
addDoc(writer);
}
} catch (IOException e) {
// e.printStackTrace();
if (debug) {
System.out.println("TEST: exception on addDoc");
e.printStackTrace(System.out);
}
hitError = true;
}
@ -452,12 +491,17 @@ public class TestIndexWriter extends TestCase
try {
writer.close();
} catch (IOException e) {
// e.printStackTrace();
if (debug) {
System.out.println("TEST: exception on close");
e.printStackTrace(System.out);
}
dir.setMaxSizeInBytes(0);
writer.close();
}
}
_TestUtil.syncConcurrentMerges(ms);
assertNoUnreferencedFiles(dir, "after disk full during addDocument with autoCommit=" + autoCommit);
// Make sure reader can open the index:
@ -468,15 +512,15 @@ public class TestIndexWriter extends TestCase
// Now try again w/ more space:
diskFree += 500;
} else {
_TestUtil.syncConcurrentMerges(writer);
dir.close();
break;
}
}
}
}
public void assertNoUnreferencedFiles(Directory dir, String message) throws IOException {
public static void assertNoUnreferencedFiles(Directory dir, String message) throws IOException {
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
@ -544,7 +588,7 @@ public class TestIndexWriter extends TestCase
dir.close();
}
private String arrayToString(String[] l) {
static String arrayToString(String[] l) {
String s = "";
for(int i=0;i<l.length;i++) {
if (i > 0) {
@ -1107,12 +1151,14 @@ public class TestIndexWriter extends TestCase
RAMDirectory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
int lastNumFile = dir.list().length;
long lastGen = -1;
for(int j=1;j<52;j++) {
Document doc = new Document();
doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
writer.addDocument(doc);
_TestUtil.syncConcurrentMerges(writer);
long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list()));
if (j == 1)
lastGen = gen;
@ -1153,7 +1199,6 @@ public class TestIndexWriter extends TestCase
public void testDiverseDocs() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
// writer.setInfoStream(System.out);
long t0 = System.currentTimeMillis();
writer.setRAMBufferSizeMB(0.5);
Random rand = new Random(31415);
@ -1348,6 +1393,48 @@ public class TestIndexWriter extends TestCase
assertEquals(2, reader.numDocs());
}
// Test calling optimize(false) whereby optimize is kicked
// off but we don't wait for it to finish (but
// writer.close()) does wait
public void testBackgroundOptimize() throws IOException {
Directory dir = new MockRAMDirectory();
for(int pass=0;pass<2;pass++) {
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMergeScheduler(new ConcurrentMergeScheduler());
Document doc = new Document();
doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
writer.setMaxBufferedDocs(2);
writer.setMergeFactor(101);
for(int i=0;i<200;i++)
writer.addDocument(doc);
writer.optimize(false);
if (0 == pass) {
writer.close();
IndexReader reader = IndexReader.open(dir);
assertTrue(reader.isOptimized());
reader.close();
} else {
// Get another segment to flush so we can verify it is
// NOT included in the optimization
writer.addDocument(doc);
writer.addDocument(doc);
writer.close();
IndexReader reader = IndexReader.open(dir);
assertTrue(!reader.isOptimized());
reader.close();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
assertEquals(2, infos.size());
}
}
dir.close();
}
private void rmDir(File dir) {
File[] files = dir.listFiles();
if (files != null) {

View File

@ -24,6 +24,7 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util._TestUtil;
import junit.framework.TestCase;
@ -73,13 +74,19 @@ public class TestIndexWriterMergePolicy extends TestCase {
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
writer.setMergeFactor(10);
MergePolicy mp = writer.getMergePolicy();
if (mp instanceof LogDocMergePolicy)
((LogDocMergePolicy) mp).setMinMergeDocs(100);
for (int i = 0; i < 100; i++) {
addDoc(writer);
writer.close();
writer = new IndexWriter(dir, new WhitespaceAnalyzer(), false);
mp = writer.getMergePolicy();
writer.setMaxBufferedDocs(10);
if (mp instanceof LogDocMergePolicy)
((LogDocMergePolicy) mp).setMinMergeDocs(100);
writer.setMergeFactor(10);
checkInvariants(writer);
}
@ -191,6 +198,7 @@ public class TestIndexWriterMergePolicy extends TestCase {
}
private void checkInvariants(IndexWriter writer) throws IOException {
_TestUtil.syncConcurrentMerges(writer);
int maxBufferedDocs = writer.getMaxBufferedDocs();
int mergeFactor = writer.getMergeFactor();
int maxMergeDocs = writer.getMaxMergeDocs();

View File

@ -16,7 +16,7 @@ package org.apache.lucene.index;
*/
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@ -37,8 +37,8 @@ public class TestIndexWriterMerging extends TestCase
int num=100;
Directory indexA = new RAMDirectory();
Directory indexB = new RAMDirectory();
Directory indexA = new MockRAMDirectory();
Directory indexB = new MockRAMDirectory();
fillIndex(indexA, 0, num);
boolean fail = verifyIndex(indexA, 0);
@ -54,7 +54,7 @@ public class TestIndexWriterMerging extends TestCase
fail("Index b is invalid");
}
Directory merged = new RAMDirectory();
Directory merged = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(merged, new StandardAnalyzer(), true);
writer.setMergeFactor(2);
@ -85,6 +85,7 @@ public class TestIndexWriterMerging extends TestCase
System.out.println("Document " + (i + startAt) + " is returning document " + temp.getField("count").stringValue());
}
}
reader.close();
return fail;
}

View File

@ -32,82 +32,84 @@ import java.io.File;
public class TestStressIndexing extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static final Random RANDOM = new Random();
private static Searcher SEARCHER;
private static int RUN_TIME_SEC = 15;
private static class IndexerThread extends Thread {
IndexWriter modifier;
int nextID;
public int count;
private static abstract class TimedThread extends Thread {
boolean failed;
int count;
private static int RUN_TIME_SEC = 6;
private TimedThread[] allThreads;
public IndexerThread(IndexWriter modifier) {
this.modifier = modifier;
abstract public void doWork() throws Throwable;
TimedThread(TimedThread[] threads) {
this.allThreads = threads;
}
public void run() {
long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
try {
while(true) {
final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
if (System.currentTimeMillis() > stopTime) {
break;
count = 0;
try {
while(System.currentTimeMillis() < stopTime && !anyErrors()) {
doWork();
count++;
}
} catch (Throwable e) {
e.printStackTrace(System.out);
failed = true;
}
}
private boolean anyErrors() {
for(int i=0;i<allThreads.length;i++)
if (allThreads[i] != null && allThreads[i].failed)
return true;
return false;
}
}
private static class IndexerThread extends TimedThread {
IndexWriter writer;
public int count;
int nextID;
public IndexerThread(IndexWriter writer, TimedThread[] threads) {
super(threads);
this.writer = writer;
}
public void doWork() throws Exception {
// Add 10 docs:
for(int j=0; j<10; j++) {
Document d = new Document();
int n = RANDOM.nextInt();
d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
modifier.addDocument(d);
writer.addDocument(d);
}
// Delete 5 docs:
int deleteID = nextID;
int deleteID = nextID-1;
for(int j=0; j<5; j++) {
modifier.deleteDocuments(new Term("id", ""+deleteID));
writer.deleteDocuments(new Term("id", ""+deleteID));
deleteID -= 2;
}
count++;
}
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
failed = true;
}
}
}
private static class SearcherThread extends Thread {
private static class SearcherThread extends TimedThread {
private Directory directory;
public int count;
boolean failed;
public SearcherThread(Directory directory) {
public SearcherThread(Directory directory, TimedThread[] threads) {
super(threads);
this.directory = directory;
}
public void run() {
long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
try {
while(true) {
for (int i=0; i<100; i++) {
public void doWork() throws Throwable {
for (int i=0; i<100; i++)
(new IndexSearcher(directory)).close();
}
count += 100;
if (System.currentTimeMillis() > stopTime) {
break;
}
}
} catch (Exception e) {
System.out.println(e.toString());
e.printStackTrace();
failed = true;
}
}
}
@ -115,22 +117,34 @@ public class TestStressIndexing extends TestCase {
Run one indexer and 2 searchers against single index as
stress test.
*/
public void runStressTest(Directory directory) throws Exception {
IndexWriter modifier = new IndexWriter(directory, ANALYZER, true);
public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception {
IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true);
modifier.setMaxBufferedDocs(10);
TimedThread[] threads = new TimedThread[4];
if (mergeScheduler != null)
modifier.setMergeScheduler(mergeScheduler);
// One modifier that writes 10 docs then removes 5, over
// and over:
IndexerThread indexerThread = new IndexerThread(modifier);
IndexerThread indexerThread = new IndexerThread(modifier, threads);
threads[0] = indexerThread;
indexerThread.start();
IndexerThread indexerThread2 = new IndexerThread(modifier);
IndexerThread indexerThread2 = new IndexerThread(modifier, threads);
threads[2] = indexerThread2;
indexerThread2.start();
// Two searchers that constantly just re-instantiate the searcher:
SearcherThread searcherThread1 = new SearcherThread(directory);
// Two searchers that constantly just re-instantiate the
// searcher:
SearcherThread searcherThread1 = new SearcherThread(directory, threads);
threads[3] = searcherThread1;
searcherThread1.start();
SearcherThread searcherThread2 = new SearcherThread(directory);
SearcherThread searcherThread2 = new SearcherThread(directory, threads);
threads[3] = searcherThread2;
searcherThread2.start();
indexerThread.join();
@ -144,6 +158,7 @@ public class TestStressIndexing extends TestCase {
assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed);
assertTrue("hit unexpected exception in search1", !searcherThread1.failed);
assertTrue("hit unexpected exception in search2", !searcherThread2.failed);
//System.out.println(" Writer: " + indexerThread.count + " iterations");
//System.out.println("Searcher 1: " + searcherThread1.count + " searchers created");
//System.out.println("Searcher 2: " + searcherThread2.count + " searchers created");
@ -155,25 +170,38 @@ public class TestStressIndexing extends TestCase {
*/
public void testStressIndexAndSearching() throws Exception {
// First in a RAM directory:
// RAMDir
Directory directory = new MockRAMDirectory();
runStressTest(directory);
runStressTest(directory, true, null);
directory.close();
// Second in an FSDirectory:
// FSDir
String tempDir = System.getProperty("java.io.tmpdir");
File dirPath = new File(tempDir, "lucene.test.stress");
directory = FSDirectory.getDirectory(dirPath);
runStressTest(directory);
runStressTest(directory, true, null);
directory.close();
rmDir(dirPath);
}
private void rmDir(File dir) {
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
files[i].delete();
}
dir.delete();
// With ConcurrentMergeScheduler, in RAMDir
directory = new MockRAMDirectory();
runStressTest(directory, true, new ConcurrentMergeScheduler());
directory.close();
// With ConcurrentMergeScheduler, in FSDir
directory = FSDirectory.getDirectory(dirPath);
runStressTest(directory, true, new ConcurrentMergeScheduler());
directory.close();
// With ConcurrentMergeScheduler and autoCommit=false, in RAMDir
directory = new MockRAMDirectory();
runStressTest(directory, false, new ConcurrentMergeScheduler());
directory.close();
// With ConcurrentMergeScheduler and autoCommit=false, in FSDir
directory = FSDirectory.getDirectory(dirPath);
runStressTest(directory, false, new ConcurrentMergeScheduler());
directory.close();
_TestUtil.rmDir(dirPath);
}
}

View File

@ -0,0 +1,160 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.analysis.SimpleAnalyzer;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.util._TestUtil;
import org.apache.lucene.util.English;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.File;
public class TestThreadedOptimize extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private final static int NUM_THREADS = 3;
//private final static int NUM_THREADS = 5;
private final static int NUM_ITER = 2;
//private final static int NUM_ITER = 10;
private final static int NUM_ITER2 = 2;
//private final static int NUM_ITER2 = 5;
private boolean failed;
private void setFailed() {
failed = true;
}
public void runTest(Directory directory, boolean autoCommit, MergeScheduler merger) throws Exception {
IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true);
writer.setMaxBufferedDocs(2);
if (merger != null)
writer.setMergeScheduler(merger);
for(int iter=0;iter<NUM_ITER;iter++) {
final int iterFinal = iter;
writer.setMergeFactor(1000);
for(int i=0;i<200;i++) {
Document d = new Document();
d.add(new Field("id", Integer.toString(i), Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(i), Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(d);
}
writer.setMergeFactor(4);
//writer.setInfoStream(System.out);
final int docCount = writer.docCount();
Thread[] threads = new Thread[NUM_THREADS];
for(int i=0;i<NUM_THREADS;i++) {
final int iFinal = i;
final IndexWriter writerFinal = writer;
threads[i] = new Thread() {
public void run() {
try {
for(int j=0;j<NUM_ITER2;j++) {
writerFinal.optimize(false);
for(int k=0;k<17*(1+iFinal);k++) {
Document d = new Document();
d.add(new Field("id", iterFinal + "_" + iFinal + "_" + j + "_" + k, Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(iFinal+k), Field.Store.NO, Field.Index.TOKENIZED));
writerFinal.addDocument(d);
}
for(int k=0;k<9*(1+iFinal);k++)
writerFinal.deleteDocuments(new Term("id", iterFinal + "_" + iFinal + "_" + j + "_" + k));
writerFinal.optimize();
}
} catch (Throwable t) {
setFailed();
System.out.println(Thread.currentThread().getName() + ": hit exception");
t.printStackTrace(System.out);
}
}
};
}
for(int i=0;i<NUM_THREADS;i++)
threads[i].start();
for(int i=0;i<NUM_THREADS;i++)
threads[i].join();
assertTrue(!failed);
final int expectedDocCount = (int) ((1+iter)*(200+8*NUM_ITER2*(NUM_THREADS/2.0)*(1+NUM_THREADS)));
// System.out.println("TEST: now index=" + writer.segString());
assertEquals(expectedDocCount, writer.docCount());
if (!autoCommit) {
writer.close();
writer = new IndexWriter(directory, autoCommit, ANALYZER, false);
writer.setMaxBufferedDocs(2);
}
IndexReader reader = IndexReader.open(directory);
assertTrue(reader.isOptimized());
assertEquals(expectedDocCount, reader.numDocs());
reader.close();
}
writer.close();
}
/*
Run above stress test against RAMDirectory and then
FSDirectory.
*/
public void testThreadedOptimize() throws Exception {
Directory directory = new MockRAMDirectory();
runTest(directory, false, null);
runTest(directory, true, null);
runTest(directory, false, new ConcurrentMergeScheduler());
runTest(directory, true, new ConcurrentMergeScheduler());
directory.close();
String tempDir = System.getProperty("tempDir");
if (tempDir == null)
throw new IOException("tempDir undefined, cannot run test");
String dirName = tempDir + "/luceneTestThreadedOptimize";
directory = FSDirectory.getDirectory(dirName);
runTest(directory, false, null);
runTest(directory, true, null);
runTest(directory, false, new ConcurrentMergeScheduler());
runTest(directory, true, new ConcurrentMergeScheduler());
directory.close();
_TestUtil.rmDir(dirName);
}
}

View File

@ -195,7 +195,7 @@ public class MockRAMDirectory extends RAMDirectory {
* RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
*/
final long getRecomputedActualSizeInBytes() {
final synchronized long getRecomputedActualSizeInBytes() {
long size = 0;
Iterator it = fileMap.values().iterator();
while (it.hasNext())

View File

@ -19,6 +19,9 @@ package org.apache.lucene.util;
import java.io.File;
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.ConcurrentMergeScheduler;
public class _TestUtil {
@ -37,4 +40,13 @@ public class _TestUtil {
public static void rmDir(String dir) throws IOException {
rmDir(new File(dir));
}
public static void syncConcurrentMerges(IndexWriter writer) {
syncConcurrentMerges(writer.getMergeScheduler());
}
public static void syncConcurrentMerges(MergeScheduler ms) {
if (ms instanceof ConcurrentMergeScheduler)
((ConcurrentMergeScheduler) ms).sync();
}
}