mirror of https://github.com/apache/lucene.git
LUCENE-7792: add optional concurrency to OfflineSorter
This commit is contained in:
parent
d8ec25bdc5
commit
db92a9efc2
|
@ -112,6 +112,9 @@ Improvements
|
|||
Accountable so you can see how much RAM it's using (Robert Muir,
|
||||
Mike McCandless)
|
||||
|
||||
* LUCENE-7792: OfflineSorter can now run concurrently if you pass it
|
||||
an optional ExecutorService (Dawid Weiss, Mike McCandless)
|
||||
|
||||
Optimizations
|
||||
|
||||
* LUCENE-7787: spatial-extras HeatmapFacetCounter will now short-circuit it's
|
||||
|
|
|
@ -877,7 +877,7 @@ final class SimpleTextBKDWriter implements Closeable {
|
|||
};
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) {
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
|
||||
|
||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,12 @@ import java.util.ArrayList;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
|
@ -73,6 +78,9 @@ public class OfflineSorter {
|
|||
private final int valueLength;
|
||||
private final String tempFileNamePrefix;
|
||||
|
||||
private final ExecutorService exec;
|
||||
private final Semaphore partitionsInRAM;
|
||||
|
||||
/**
|
||||
* A bit more descriptive unit for constructors.
|
||||
*
|
||||
|
@ -145,13 +153,13 @@ public class OfflineSorter {
|
|||
/** number of lines of data read */
|
||||
public int lineCount;
|
||||
/** time spent merging sorted partitions (in milliseconds) */
|
||||
public long mergeTime;
|
||||
public final AtomicLong mergeTimeMS = new AtomicLong();
|
||||
/** time spent sorting data (in milliseconds) */
|
||||
public long sortTime;
|
||||
public final AtomicLong sortTimeMS = new AtomicLong();
|
||||
/** total time spent (in milliseconds) */
|
||||
public long totalTime;
|
||||
public long totalTimeMS;
|
||||
/** time spent in i/o read (in milliseconds) */
|
||||
public long readTime;
|
||||
public long readTimeMS;
|
||||
/** read buffer size (in bytes) */
|
||||
public final long bufferSize = ramBufferSize.bytes;
|
||||
|
||||
|
@ -161,17 +169,15 @@ public class OfflineSorter {
|
|||
@Override
|
||||
public String toString() {
|
||||
return String.format(Locale.ROOT,
|
||||
"time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB",
|
||||
totalTime / 1000.0d, readTime / 1000.0d, sortTime / 1000.0d, mergeTime / 1000.0d,
|
||||
lineCount, tempMergeFiles, mergeRounds,
|
||||
(double) bufferSize / MB);
|
||||
"time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB",
|
||||
totalTimeMS / 1000.0d, readTimeMS / 1000.0d, sortTimeMS.get() / 1000.0d, mergeTimeMS.get() / 1000.0d,
|
||||
lineCount, tempMergeFiles, mergeRounds,
|
||||
(double) bufferSize / MB);
|
||||
}
|
||||
}
|
||||
|
||||
private final BufferSize ramBufferSize;
|
||||
|
||||
private final Counter bufferBytesUsed = Counter.newCounter();
|
||||
private final SortableBytesRefArray buffer;
|
||||
SortInfo sortInfo;
|
||||
private int maxTempFiles;
|
||||
private final Comparator<BytesRef> comparator;
|
||||
|
@ -185,7 +191,7 @@ public class OfflineSorter {
|
|||
* @see BufferSize#automatic()
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix) throws IOException {
|
||||
this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1);
|
||||
this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -194,14 +200,30 @@ public class OfflineSorter {
|
|||
* @see BufferSize#automatic()
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator) throws IOException {
|
||||
this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1);
|
||||
this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* All-details constructor. If {@code valueLength} is -1 (the default), the length of each value differs; otherwise,
|
||||
* all values have the specified length.
|
||||
* all values have the specified length. If you pass a non-null {@code ExecutorService} then it will be
|
||||
* used to run sorting operations that can be run concurrently, and maxPartitionsInRAM is the maximum
|
||||
* concurrent in-memory partitions. Thus the maximum possible RAM used by this class while sorting is
|
||||
* {@code maxPartitionsInRAM * ramBufferSize}.
|
||||
*/
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator, BufferSize ramBufferSize, int maxTempfiles, int valueLength) {
|
||||
public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator,
|
||||
BufferSize ramBufferSize, int maxTempfiles, int valueLength, ExecutorService exec,
|
||||
int maxPartitionsInRAM) {
|
||||
if (exec != null) {
|
||||
this.exec = exec;
|
||||
if (maxPartitionsInRAM <= 0) {
|
||||
throw new IllegalArgumentException("maxPartitionsInRAM must be > 0; got " + maxPartitionsInRAM);
|
||||
}
|
||||
} else {
|
||||
this.exec = new SameThreadExecutorService();
|
||||
maxPartitionsInRAM = 1;
|
||||
}
|
||||
this.partitionsInRAM = new Semaphore(maxPartitionsInRAM);
|
||||
|
||||
if (ramBufferSize.bytes < ABSOLUTE_MIN_SORT_BUFFER_SIZE) {
|
||||
throw new IllegalArgumentException(MIN_BUFFER_SIZE_MSG + ": " + ramBufferSize.bytes);
|
||||
}
|
||||
|
@ -209,14 +231,11 @@ public class OfflineSorter {
|
|||
if (maxTempfiles < 2) {
|
||||
throw new IllegalArgumentException("maxTempFiles must be >= 2");
|
||||
}
|
||||
if (valueLength == -1) {
|
||||
buffer = new BytesRefArray(bufferBytesUsed);
|
||||
} else {
|
||||
if (valueLength == 0 || valueLength > Short.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength);
|
||||
}
|
||||
buffer = new FixedLengthBytesRefArray(valueLength);
|
||||
|
||||
if (valueLength != -1 && (valueLength == 0 || valueLength > Short.MAX_VALUE)) {
|
||||
throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength);
|
||||
}
|
||||
|
||||
this.valueLength = valueLength;
|
||||
this.ramBufferSize = ramBufferSize;
|
||||
this.maxTempFiles = maxTempfiles;
|
||||
|
@ -241,26 +260,28 @@ public class OfflineSorter {
|
|||
public String sort(String inputFileName) throws IOException {
|
||||
|
||||
sortInfo = new SortInfo();
|
||||
sortInfo.totalTime = System.currentTimeMillis();
|
||||
long startMS = System.currentTimeMillis();
|
||||
|
||||
List<PartitionAndCount> segments = new ArrayList<>();
|
||||
List<Future<Partition>> segments = new ArrayList<>();
|
||||
int[] levelCounts = new int[1];
|
||||
|
||||
// So we can remove any partially written temp files on exception:
|
||||
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
|
||||
|
||||
boolean success = false;
|
||||
boolean[] isExhausted = new boolean[1];
|
||||
try (ByteSequencesReader is = getReader(dir.openChecksumInput(inputFileName, IOContext.READONCE), inputFileName)) {
|
||||
while (isExhausted[0] == false) {
|
||||
int lineCount = readPartition(is, isExhausted);
|
||||
if (lineCount == 0) {
|
||||
assert isExhausted[0];
|
||||
while (true) {
|
||||
Partition part = readPartition(is);
|
||||
if (part.count == 0) {
|
||||
assert part.exhausted;
|
||||
break;
|
||||
}
|
||||
segments.add(sortPartition(trackingDir, lineCount));
|
||||
|
||||
Callable<Partition> job = new SortPartitionTask(trackingDir, part);
|
||||
|
||||
segments.add(exec.submit(job));
|
||||
sortInfo.tempMergeFiles++;
|
||||
sortInfo.lineCount += lineCount;
|
||||
sortInfo.lineCount += part.count;
|
||||
levelCounts[0]++;
|
||||
|
||||
// Handle intermediate merges; we need a while loop to "cascade" the merge when necessary:
|
||||
|
@ -274,6 +295,10 @@ public class OfflineSorter {
|
|||
levelCounts[mergeLevel] = 0;
|
||||
mergeLevel++;
|
||||
}
|
||||
|
||||
if (part.exhausted) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: we shouldn't have to do this? Can't we return a merged reader to
|
||||
|
@ -292,13 +317,22 @@ public class OfflineSorter {
|
|||
result = out.getName();
|
||||
}
|
||||
} else {
|
||||
result = segments.get(0).fileName;
|
||||
try {
|
||||
result = segments.get(0).get().fileName;
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
} catch (ExecutionException ee) {
|
||||
IOUtils.reThrow(ee.getCause());
|
||||
|
||||
// dead code but javac disagrees:
|
||||
result = null;
|
||||
}
|
||||
}
|
||||
|
||||
// We should be explicitly removing all intermediate files ourselves unless there is an exception:
|
||||
assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result);
|
||||
|
||||
sortInfo.totalTime = System.currentTimeMillis() - sortInfo.totalTime;
|
||||
sortInfo.totalTimeMS = System.currentTimeMillis() - startMS;
|
||||
|
||||
CodecUtil.checkFooter(is.in);
|
||||
|
||||
|
@ -306,6 +340,8 @@ public class OfflineSorter {
|
|||
|
||||
return result;
|
||||
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.deleteFilesIgnoringExceptions(trackingDir, trackingDir.getCreatedFiles());
|
||||
|
@ -313,36 +349,6 @@ public class OfflineSorter {
|
|||
}
|
||||
}
|
||||
|
||||
/** Sort a single partition in-memory. */
|
||||
protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException {
|
||||
|
||||
try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
|
||||
ByteSequencesWriter out = getWriter(tempFile, lineCount);) {
|
||||
|
||||
BytesRef spare;
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
BytesRefIterator iter = buffer.iterator(comparator);
|
||||
sortInfo.sortTime += System.currentTimeMillis() - start;
|
||||
|
||||
int count = 0;
|
||||
while ((spare = iter.next()) != null) {
|
||||
assert spare.length <= Short.MAX_VALUE;
|
||||
out.write(spare);
|
||||
count++;
|
||||
}
|
||||
|
||||
assert count == lineCount;
|
||||
|
||||
// Clean up the buffer for the next partition.
|
||||
buffer.clear();
|
||||
|
||||
CodecUtil.writeFooter(out.out);
|
||||
|
||||
return new PartitionAndCount(lineCount, tempFile.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/** Called on exception, to check whether the checksum is also corrupt in this source, and add that
|
||||
* information (checksum matched or didn't) as a suppressed exception. */
|
||||
private void verifyChecksum(Throwable priorException, ByteSequencesReader reader) throws IOException {
|
||||
|
@ -352,93 +358,61 @@ public class OfflineSorter {
|
|||
}
|
||||
|
||||
/** Merge the most recent {@code maxTempFile} partitions into a new partition. */
|
||||
void mergePartitions(Directory trackingDir, List<PartitionAndCount> segments) throws IOException {
|
||||
void mergePartitions(Directory trackingDir, List<Future<Partition>> segments) throws IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
List<PartitionAndCount> segmentsToMerge;
|
||||
List<Future<Partition>> segmentsToMerge;
|
||||
if (segments.size() > maxTempFiles) {
|
||||
segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
|
||||
} else {
|
||||
segmentsToMerge = segments;
|
||||
}
|
||||
|
||||
long totalCount = 0;
|
||||
for (PartitionAndCount segment : segmentsToMerge) {
|
||||
totalCount += segment.count;
|
||||
}
|
||||
sortInfo.mergeRounds++;
|
||||
|
||||
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
|
||||
@Override
|
||||
protected boolean lessThan(FileAndTop a, FileAndTop b) {
|
||||
return comparator.compare(a.current, b.current) < 0;
|
||||
}
|
||||
};
|
||||
|
||||
ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
|
||||
|
||||
String newSegmentName = null;
|
||||
|
||||
try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
|
||||
|
||||
newSegmentName = writer.out.getName();
|
||||
|
||||
// Open streams and read the top for each file
|
||||
for (int i = 0; i < segmentsToMerge.size(); i++) {
|
||||
streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName);
|
||||
BytesRef item = null;
|
||||
try {
|
||||
item = streams[i].next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, streams[i]);
|
||||
}
|
||||
assert item != null;
|
||||
queue.insertWithOverflow(new FileAndTop(i, item));
|
||||
}
|
||||
|
||||
// Unix utility sort() uses ordered array of files to pick the next line from, updating
|
||||
// it as it reads new lines. The PQ used here is a more elegant solution and has
|
||||
// a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway
|
||||
// so it shouldn't make much of a difference (didn't check).
|
||||
FileAndTop top;
|
||||
while ((top = queue.top()) != null) {
|
||||
writer.write(top.current);
|
||||
try {
|
||||
top.current = streams[top.fd].next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, streams[top.fd]);
|
||||
}
|
||||
|
||||
if (top.current != null) {
|
||||
queue.updateTop();
|
||||
} else {
|
||||
queue.pop();
|
||||
}
|
||||
}
|
||||
|
||||
CodecUtil.writeFooter(writer.out);
|
||||
|
||||
for(ByteSequencesReader reader : streams) {
|
||||
CodecUtil.checkFooter(reader.in);
|
||||
}
|
||||
|
||||
sortInfo.mergeTime += System.currentTimeMillis() - start;
|
||||
sortInfo.mergeRounds++;
|
||||
} finally {
|
||||
IOUtils.close(streams);
|
||||
}
|
||||
|
||||
IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList()));
|
||||
MergePartitionsTask task = new MergePartitionsTask(trackingDir, new ArrayList<>(segmentsToMerge));
|
||||
|
||||
segmentsToMerge.clear();
|
||||
segments.add(new PartitionAndCount(totalCount, newSegmentName));
|
||||
segments.add(exec.submit(task));
|
||||
|
||||
sortInfo.tempMergeFiles++;
|
||||
}
|
||||
|
||||
/** Holds one partition of items, either loaded into memory or based on a file. */
|
||||
private static class Partition {
|
||||
public final SortableBytesRefArray buffer;
|
||||
public final boolean exhausted;
|
||||
public final long count;
|
||||
public final String fileName;
|
||||
|
||||
/** A partition loaded into memory. */
|
||||
public Partition(SortableBytesRefArray buffer, boolean exhausted) {
|
||||
this.buffer = buffer;
|
||||
this.fileName = null;
|
||||
this.count = buffer.size();
|
||||
this.exhausted = exhausted;
|
||||
}
|
||||
|
||||
/** An on-disk partition. */
|
||||
public Partition(String fileName, long count) {
|
||||
this.buffer = null;
|
||||
this.fileName = fileName;
|
||||
this.count = count;
|
||||
this.exhausted = true;
|
||||
}
|
||||
}
|
||||
|
||||
/** Read in a single partition of data, setting isExhausted[0] to true if there are no more items. */
|
||||
int readPartition(ByteSequencesReader reader, boolean[] isExhausted) throws IOException {
|
||||
Partition readPartition(ByteSequencesReader reader) throws IOException, InterruptedException {
|
||||
if (partitionsInRAM != null) {
|
||||
partitionsInRAM.acquire();
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
SortableBytesRefArray buffer;
|
||||
boolean exhausted = false;
|
||||
int count;
|
||||
if (valueLength != -1) {
|
||||
// fixed length case
|
||||
buffer = new FixedLengthBytesRefArray(valueLength);
|
||||
int limit = ramBufferSize.bytes / valueLength;
|
||||
for(int i=0;i<limit;i++) {
|
||||
BytesRef item = null;
|
||||
|
@ -448,12 +422,14 @@ public class OfflineSorter {
|
|||
verifyChecksum(t, reader);
|
||||
}
|
||||
if (item == null) {
|
||||
isExhausted[0] = true;
|
||||
exhausted = true;
|
||||
break;
|
||||
}
|
||||
buffer.append(item);
|
||||
}
|
||||
} else {
|
||||
Counter bufferBytesUsed = Counter.newCounter();
|
||||
buffer = new BytesRefArray(bufferBytesUsed);
|
||||
while (true) {
|
||||
BytesRef item = null;
|
||||
try {
|
||||
|
@ -462,7 +438,7 @@ public class OfflineSorter {
|
|||
verifyChecksum(t, reader);
|
||||
}
|
||||
if (item == null) {
|
||||
isExhausted[0] = true;
|
||||
exhausted = true;
|
||||
break;
|
||||
}
|
||||
buffer.append(item);
|
||||
|
@ -473,8 +449,9 @@ public class OfflineSorter {
|
|||
}
|
||||
}
|
||||
}
|
||||
sortInfo.readTime += System.currentTimeMillis() - start;
|
||||
return buffer.size();
|
||||
sortInfo.readTimeMS += System.currentTimeMillis() - start;
|
||||
|
||||
return new Partition(buffer, exhausted);
|
||||
}
|
||||
|
||||
static class FileAndTop {
|
||||
|
@ -606,13 +583,133 @@ public class OfflineSorter {
|
|||
return comparator;
|
||||
}
|
||||
|
||||
private static class PartitionAndCount {
|
||||
final long count;
|
||||
final String fileName;
|
||||
/** Sorts one in-memory partition, writes it to disk, and returns the resulting file-based partition. */
|
||||
private class SortPartitionTask implements Callable<Partition> {
|
||||
|
||||
public PartitionAndCount(long count, String fileName) {
|
||||
this.count = count;
|
||||
this.fileName = fileName;
|
||||
private final Directory dir;
|
||||
private final Partition part;
|
||||
|
||||
public SortPartitionTask(Directory dir, Partition part) {
|
||||
this.dir = dir;
|
||||
this.part = part;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Partition call() throws IOException {
|
||||
try (IndexOutput tempFile = dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
|
||||
ByteSequencesWriter out = getWriter(tempFile, part.buffer.size());) {
|
||||
|
||||
BytesRef spare;
|
||||
|
||||
long startMS = System.currentTimeMillis();
|
||||
BytesRefIterator iter = part.buffer.iterator(comparator);
|
||||
sortInfo.sortTimeMS.addAndGet(System.currentTimeMillis() - startMS);
|
||||
|
||||
int count = 0;
|
||||
while ((spare = iter.next()) != null) {
|
||||
assert spare.length <= Short.MAX_VALUE;
|
||||
out.write(spare);
|
||||
count++;
|
||||
}
|
||||
|
||||
assert count == part.count;
|
||||
|
||||
CodecUtil.writeFooter(out.out);
|
||||
part.buffer.clear();
|
||||
if (partitionsInRAM != null) {
|
||||
partitionsInRAM.release();
|
||||
}
|
||||
|
||||
return new Partition(tempFile.getName(), part.count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Merges multiple file-based partitions to a single on-disk partition. */
|
||||
private class MergePartitionsTask implements Callable<Partition> {
|
||||
private final Directory dir;
|
||||
private final List<Future<Partition>> segmentsToMerge;
|
||||
|
||||
public MergePartitionsTask(Directory dir, List<Future<Partition>> segmentsToMerge) {
|
||||
this.dir = dir;
|
||||
this.segmentsToMerge = segmentsToMerge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Partition call() throws IOException, InterruptedException, ExecutionException {
|
||||
long totalCount = 0;
|
||||
for (Future<Partition> segment : segmentsToMerge) {
|
||||
totalCount += segment.get().count;
|
||||
}
|
||||
|
||||
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
|
||||
@Override
|
||||
protected boolean lessThan(FileAndTop a, FileAndTop b) {
|
||||
return comparator.compare(a.current, b.current) < 0;
|
||||
}
|
||||
};
|
||||
|
||||
ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
|
||||
|
||||
String newSegmentName = null;
|
||||
|
||||
long startMS = System.currentTimeMillis();
|
||||
try (ByteSequencesWriter writer = getWriter(dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
|
||||
|
||||
newSegmentName = writer.out.getName();
|
||||
|
||||
// Open streams and read the top for each file
|
||||
for (int i = 0; i < segmentsToMerge.size(); i++) {
|
||||
Partition segment = segmentsToMerge.get(i).get();
|
||||
streams[i] = getReader(dir.openChecksumInput(segment.fileName, IOContext.READONCE), segment.fileName);
|
||||
|
||||
BytesRef item = null;
|
||||
try {
|
||||
item = streams[i].next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, streams[i]);
|
||||
}
|
||||
assert item != null;
|
||||
queue.insertWithOverflow(new FileAndTop(i, item));
|
||||
}
|
||||
|
||||
// Unix utility sort() uses ordered array of files to pick the next line from, updating
|
||||
// it as it reads new lines. The PQ used here is a more elegant solution and has
|
||||
// a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway
|
||||
// so it shouldn't make much of a difference (didn't check).
|
||||
FileAndTop top;
|
||||
while ((top = queue.top()) != null) {
|
||||
writer.write(top.current);
|
||||
try {
|
||||
top.current = streams[top.fd].next();
|
||||
} catch (Throwable t) {
|
||||
verifyChecksum(t, streams[top.fd]);
|
||||
}
|
||||
|
||||
if (top.current != null) {
|
||||
queue.updateTop();
|
||||
} else {
|
||||
queue.pop();
|
||||
}
|
||||
}
|
||||
|
||||
CodecUtil.writeFooter(writer.out);
|
||||
|
||||
for(ByteSequencesReader reader : streams) {
|
||||
CodecUtil.checkFooter(reader.in);
|
||||
}
|
||||
|
||||
sortInfo.mergeTimeMS.addAndGet(System.currentTimeMillis() - startMS);
|
||||
} finally {
|
||||
IOUtils.close(streams);
|
||||
}
|
||||
List<String> toDelete = new ArrayList<>();
|
||||
for (Future<Partition> segment : segmentsToMerge) {
|
||||
toDelete.add(segment.get().fileName);
|
||||
}
|
||||
IOUtils.deleteFiles(dir, toDelete);
|
||||
|
||||
return new Partition(newSegmentName, totalCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.util;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** An {@code ExecutorService} that executes tasks immediately in the calling thread during submit. */
|
||||
class SameThreadExecutorService extends AbstractExecutorService {
|
||||
private volatile boolean shutdown;
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
checkShutdown();
|
||||
command.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
shutdown();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
this.shutdown = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
// Simplified: we don't check for any threads hanging in execute (we could
|
||||
// introduce an atomic counter, but there seems to be no point).
|
||||
return shutdown == true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return shutdown == true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
// See comment in isTerminated();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void checkShutdown() {
|
||||
if (shutdown) {
|
||||
throw new RejectedExecutionException("Executor is shut down.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -884,7 +884,7 @@ public class BKDWriter implements Closeable {
|
|||
};
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) {
|
||||
OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
|
||||
|
||||
/** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,10 @@ import java.nio.file.Path;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
|
@ -73,11 +77,25 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private ExecutorService randomExecutorServiceOrNull() {
|
||||
if (random().nextBoolean()) {
|
||||
return null;
|
||||
} else {
|
||||
return new ThreadPoolExecutor(1, TestUtil.nextInt(random(), 2, 6), Long.MAX_VALUE, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(),
|
||||
new NamedThreadFactory("TestIndexSearcher"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testIntermediateMerges() throws Exception {
|
||||
// Sort 20 mb worth of data with 1mb buffer, binary merging.
|
||||
try (Directory dir = newDirectory()) {
|
||||
SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1),
|
||||
ExecutorService exec = randomExecutorServiceOrNull();
|
||||
SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1, exec, TestUtil.nextInt(random(), 1, 4)),
|
||||
generateRandom((int)OfflineSorter.MB * 20));
|
||||
if (exec != null) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
assertTrue(info.mergeRounds > 10);
|
||||
}
|
||||
}
|
||||
|
@ -85,8 +103,12 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
public void testSmallRandom() throws Exception {
|
||||
// Sort 20 mb worth of data with 1mb buffer.
|
||||
try (Directory dir = newDirectory()) {
|
||||
SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1),
|
||||
ExecutorService exec = randomExecutorServiceOrNull();
|
||||
SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)),
|
||||
generateRandom((int)OfflineSorter.MB * 20));
|
||||
if (exec != null) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
assertEquals(3, sortInfo.mergeRounds);
|
||||
}
|
||||
}
|
||||
|
@ -95,8 +117,12 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
public void testLargerRandom() throws Exception {
|
||||
// Sort 100MB worth of data with 15mb buffer.
|
||||
try (Directory dir = newFSDirectory(createTempDir())) {
|
||||
checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1),
|
||||
ExecutorService exec = randomExecutorServiceOrNull();
|
||||
checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)),
|
||||
generateRandom((int)OfflineSorter.MB * 100));
|
||||
if (exec != null) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,7 +384,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||
|
||||
CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName());
|
||||
});
|
||||
assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
|
||||
}
|
||||
|
@ -408,7 +434,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
|
||||
|
||||
EOFException e = expectThrows(EOFException.class, () -> {
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName());
|
||||
});
|
||||
assertEquals(1, e.getSuppressed().length);
|
||||
assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException);
|
||||
|
@ -430,8 +456,12 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
CodecUtil.writeFooter(out);
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES);
|
||||
ExecutorService exec = randomExecutorServiceOrNull();
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, exec, TestUtil.nextInt(random(), 1, 4));
|
||||
sorter.sort(out.getName());
|
||||
if (exec != null) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
// 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition:
|
||||
assertEquals(0, sorter.sortInfo.mergeRounds);
|
||||
dir.close();
|
||||
|
@ -448,7 +478,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
CodecUtil.writeFooter(out);
|
||||
}
|
||||
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES);
|
||||
OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES, null, 0);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
|
||||
sorter.sort(out.getName());
|
||||
});
|
||||
|
@ -467,7 +497,7 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
CodecUtil.writeFooter(out);
|
||||
}
|
||||
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES) {
|
||||
new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, null, 0) {
|
||||
@Override
|
||||
protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
|
||||
ByteSequencesReader other = super.getReader(in, name);
|
||||
|
@ -502,13 +532,13 @@ public class TestOfflineSorter extends LuceneTestCase {
|
|||
e = expectThrows(IllegalArgumentException.class,
|
||||
() -> {
|
||||
new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0);
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0, null, 0);
|
||||
});
|
||||
assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage());
|
||||
e = expectThrows(IllegalArgumentException.class,
|
||||
() -> {
|
||||
new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE);
|
||||
BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE, null, 0);
|
||||
});
|
||||
assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue