From db92a9efc2eeea9a8002a8b8b05828f2411d2a7b Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Wed, 26 Apr 2017 06:38:28 -0400 Subject: [PATCH] LUCENE-7792: add optional concurrency to OfflineSorter --- lucene/CHANGES.txt | 3 + .../simpletext/SimpleTextBKDWriter.java | 2 +- .../org/apache/lucene/util/OfflineSorter.java | 383 +++++++++++------- .../util/SameThreadExecutorService.java | 69 ++++ .../org/apache/lucene/util/bkd/BKDWriter.java | 2 +- .../apache/lucene/util/TestOfflineSorter.java | 50 ++- 6 files changed, 354 insertions(+), 155 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index a99a2be29e4..f9cca9fdfad 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -112,6 +112,9 @@ Improvements Accountable so you can see how much RAM it's using (Robert Muir, Mike McCandless) +* LUCENE-7792: OfflineSorter can now run concurrently if you pass it + an optional ExecutorService (Dawid Weiss, Mike McCandless) + Optimizations * LUCENE-7787: spatial-extras HeatmapFacetCounter will now short-circuit it's diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java index 86697ebe0b9..dd89537550d 100644 --- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java +++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java @@ -877,7 +877,7 @@ final class SimpleTextBKDWriter implements Closeable { }; } - OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) { + OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) { /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ @Override diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java index d27305793d1..b28752a1045 100644 --- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java +++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java @@ -24,7 +24,12 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Locale; -import java.util.stream.Collectors; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.ChecksumIndexInput; @@ -73,6 +78,9 @@ public class OfflineSorter { private final int valueLength; private final String tempFileNamePrefix; + private final ExecutorService exec; + private final Semaphore partitionsInRAM; + /** * A bit more descriptive unit for constructors. * @@ -145,13 +153,13 @@ public class OfflineSorter { /** number of lines of data read */ public int lineCount; /** time spent merging sorted partitions (in milliseconds) */ - public long mergeTime; + public final AtomicLong mergeTimeMS = new AtomicLong(); /** time spent sorting data (in milliseconds) */ - public long sortTime; + public final AtomicLong sortTimeMS = new AtomicLong(); /** total time spent (in milliseconds) */ - public long totalTime; + public long totalTimeMS; /** time spent in i/o read (in milliseconds) */ - public long readTime; + public long readTimeMS; /** read buffer size (in bytes) */ public final long bufferSize = ramBufferSize.bytes; @@ -161,17 +169,15 @@ public class OfflineSorter { @Override public String toString() { return String.format(Locale.ROOT, - "time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB", - totalTime / 1000.0d, readTime / 1000.0d, sortTime / 1000.0d, mergeTime / 1000.0d, - lineCount, tempMergeFiles, mergeRounds, - (double) bufferSize / MB); + "time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB", + totalTimeMS / 1000.0d, readTimeMS / 1000.0d, sortTimeMS.get() / 1000.0d, mergeTimeMS.get() / 1000.0d, + lineCount, tempMergeFiles, mergeRounds, + (double) bufferSize / MB); } } private final BufferSize ramBufferSize; - private final Counter bufferBytesUsed = Counter.newCounter(); - private final SortableBytesRefArray buffer; SortInfo sortInfo; private int maxTempFiles; private final Comparator comparator; @@ -185,7 +191,7 @@ public class OfflineSorter { * @see BufferSize#automatic() */ public OfflineSorter(Directory dir, String tempFileNamePrefix) throws IOException { - this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1); + this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0); } /** @@ -194,14 +200,30 @@ public class OfflineSorter { * @see BufferSize#automatic() */ public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator comparator) throws IOException { - this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1); + this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0); } /** * All-details constructor. If {@code valueLength} is -1 (the default), the length of each value differs; otherwise, - * all values have the specified length. + * all values have the specified length. If you pass a non-null {@code ExecutorService} then it will be + * used to run sorting operations that can be run concurrently, and maxPartitionsInRAM is the maximum + * concurrent in-memory partitions. Thus the maximum possible RAM used by this class while sorting is + * {@code maxPartitionsInRAM * ramBufferSize}. */ - public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator comparator, BufferSize ramBufferSize, int maxTempfiles, int valueLength) { + public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator comparator, + BufferSize ramBufferSize, int maxTempfiles, int valueLength, ExecutorService exec, + int maxPartitionsInRAM) { + if (exec != null) { + this.exec = exec; + if (maxPartitionsInRAM <= 0) { + throw new IllegalArgumentException("maxPartitionsInRAM must be > 0; got " + maxPartitionsInRAM); + } + } else { + this.exec = new SameThreadExecutorService(); + maxPartitionsInRAM = 1; + } + this.partitionsInRAM = new Semaphore(maxPartitionsInRAM); + if (ramBufferSize.bytes < ABSOLUTE_MIN_SORT_BUFFER_SIZE) { throw new IllegalArgumentException(MIN_BUFFER_SIZE_MSG + ": " + ramBufferSize.bytes); } @@ -209,14 +231,11 @@ public class OfflineSorter { if (maxTempfiles < 2) { throw new IllegalArgumentException("maxTempFiles must be >= 2"); } - if (valueLength == -1) { - buffer = new BytesRefArray(bufferBytesUsed); - } else { - if (valueLength == 0 || valueLength > Short.MAX_VALUE) { - throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength); - } - buffer = new FixedLengthBytesRefArray(valueLength); + + if (valueLength != -1 && (valueLength == 0 || valueLength > Short.MAX_VALUE)) { + throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength); } + this.valueLength = valueLength; this.ramBufferSize = ramBufferSize; this.maxTempFiles = maxTempfiles; @@ -241,26 +260,28 @@ public class OfflineSorter { public String sort(String inputFileName) throws IOException { sortInfo = new SortInfo(); - sortInfo.totalTime = System.currentTimeMillis(); + long startMS = System.currentTimeMillis(); - List segments = new ArrayList<>(); + List> segments = new ArrayList<>(); int[] levelCounts = new int[1]; // So we can remove any partially written temp files on exception: TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); boolean success = false; - boolean[] isExhausted = new boolean[1]; try (ByteSequencesReader is = getReader(dir.openChecksumInput(inputFileName, IOContext.READONCE), inputFileName)) { - while (isExhausted[0] == false) { - int lineCount = readPartition(is, isExhausted); - if (lineCount == 0) { - assert isExhausted[0]; + while (true) { + Partition part = readPartition(is); + if (part.count == 0) { + assert part.exhausted; break; } - segments.add(sortPartition(trackingDir, lineCount)); + + Callable job = new SortPartitionTask(trackingDir, part); + + segments.add(exec.submit(job)); sortInfo.tempMergeFiles++; - sortInfo.lineCount += lineCount; + sortInfo.lineCount += part.count; levelCounts[0]++; // Handle intermediate merges; we need a while loop to "cascade" the merge when necessary: @@ -274,6 +295,10 @@ public class OfflineSorter { levelCounts[mergeLevel] = 0; mergeLevel++; } + + if (part.exhausted) { + break; + } } // TODO: we shouldn't have to do this? Can't we return a merged reader to @@ -292,13 +317,22 @@ public class OfflineSorter { result = out.getName(); } } else { - result = segments.get(0).fileName; + try { + result = segments.get(0).get().fileName; + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } catch (ExecutionException ee) { + IOUtils.reThrow(ee.getCause()); + + // dead code but javac disagrees: + result = null; + } } // We should be explicitly removing all intermediate files ourselves unless there is an exception: assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result); - sortInfo.totalTime = System.currentTimeMillis() - sortInfo.totalTime; + sortInfo.totalTimeMS = System.currentTimeMillis() - startMS; CodecUtil.checkFooter(is.in); @@ -306,6 +340,8 @@ public class OfflineSorter { return result; + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); } finally { if (success == false) { IOUtils.deleteFilesIgnoringExceptions(trackingDir, trackingDir.getCreatedFiles()); @@ -313,36 +349,6 @@ public class OfflineSorter { } } - /** Sort a single partition in-memory. */ - protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException { - - try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT); - ByteSequencesWriter out = getWriter(tempFile, lineCount);) { - - BytesRef spare; - - long start = System.currentTimeMillis(); - BytesRefIterator iter = buffer.iterator(comparator); - sortInfo.sortTime += System.currentTimeMillis() - start; - - int count = 0; - while ((spare = iter.next()) != null) { - assert spare.length <= Short.MAX_VALUE; - out.write(spare); - count++; - } - - assert count == lineCount; - - // Clean up the buffer for the next partition. - buffer.clear(); - - CodecUtil.writeFooter(out.out); - - return new PartitionAndCount(lineCount, tempFile.getName()); - } - } - /** Called on exception, to check whether the checksum is also corrupt in this source, and add that * information (checksum matched or didn't) as a suppressed exception. */ private void verifyChecksum(Throwable priorException, ByteSequencesReader reader) throws IOException { @@ -352,93 +358,61 @@ public class OfflineSorter { } /** Merge the most recent {@code maxTempFile} partitions into a new partition. */ - void mergePartitions(Directory trackingDir, List segments) throws IOException { + void mergePartitions(Directory trackingDir, List> segments) throws IOException { long start = System.currentTimeMillis(); - - List segmentsToMerge; + List> segmentsToMerge; if (segments.size() > maxTempFiles) { segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size()); } else { segmentsToMerge = segments; } - long totalCount = 0; - for (PartitionAndCount segment : segmentsToMerge) { - totalCount += segment.count; - } + sortInfo.mergeRounds++; - PriorityQueue queue = new PriorityQueue(segmentsToMerge.size()) { - @Override - protected boolean lessThan(FileAndTop a, FileAndTop b) { - return comparator.compare(a.current, b.current) < 0; - } - }; - - ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()]; - - String newSegmentName = null; - - try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) { - - newSegmentName = writer.out.getName(); - - // Open streams and read the top for each file - for (int i = 0; i < segmentsToMerge.size(); i++) { - streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName); - BytesRef item = null; - try { - item = streams[i].next(); - } catch (Throwable t) { - verifyChecksum(t, streams[i]); - } - assert item != null; - queue.insertWithOverflow(new FileAndTop(i, item)); - } - - // Unix utility sort() uses ordered array of files to pick the next line from, updating - // it as it reads new lines. The PQ used here is a more elegant solution and has - // a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway - // so it shouldn't make much of a difference (didn't check). - FileAndTop top; - while ((top = queue.top()) != null) { - writer.write(top.current); - try { - top.current = streams[top.fd].next(); - } catch (Throwable t) { - verifyChecksum(t, streams[top.fd]); - } - - if (top.current != null) { - queue.updateTop(); - } else { - queue.pop(); - } - } - - CodecUtil.writeFooter(writer.out); - - for(ByteSequencesReader reader : streams) { - CodecUtil.checkFooter(reader.in); - } - - sortInfo.mergeTime += System.currentTimeMillis() - start; - sortInfo.mergeRounds++; - } finally { - IOUtils.close(streams); - } - - IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList())); + MergePartitionsTask task = new MergePartitionsTask(trackingDir, new ArrayList<>(segmentsToMerge)); segmentsToMerge.clear(); - segments.add(new PartitionAndCount(totalCount, newSegmentName)); + segments.add(exec.submit(task)); sortInfo.tempMergeFiles++; } + /** Holds one partition of items, either loaded into memory or based on a file. */ + private static class Partition { + public final SortableBytesRefArray buffer; + public final boolean exhausted; + public final long count; + public final String fileName; + + /** A partition loaded into memory. */ + public Partition(SortableBytesRefArray buffer, boolean exhausted) { + this.buffer = buffer; + this.fileName = null; + this.count = buffer.size(); + this.exhausted = exhausted; + } + + /** An on-disk partition. */ + public Partition(String fileName, long count) { + this.buffer = null; + this.fileName = fileName; + this.count = count; + this.exhausted = true; + } + } + /** Read in a single partition of data, setting isExhausted[0] to true if there are no more items. */ - int readPartition(ByteSequencesReader reader, boolean[] isExhausted) throws IOException { + Partition readPartition(ByteSequencesReader reader) throws IOException, InterruptedException { + if (partitionsInRAM != null) { + partitionsInRAM.acquire(); + } long start = System.currentTimeMillis(); + SortableBytesRefArray buffer; + boolean exhausted = false; + int count; if (valueLength != -1) { + // fixed length case + buffer = new FixedLengthBytesRefArray(valueLength); int limit = ramBufferSize.bytes / valueLength; for(int i=0;i { - public PartitionAndCount(long count, String fileName) { - this.count = count; - this.fileName = fileName; + private final Directory dir; + private final Partition part; + + public SortPartitionTask(Directory dir, Partition part) { + this.dir = dir; + this.part = part; + } + + @Override + public Partition call() throws IOException { + try (IndexOutput tempFile = dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT); + ByteSequencesWriter out = getWriter(tempFile, part.buffer.size());) { + + BytesRef spare; + + long startMS = System.currentTimeMillis(); + BytesRefIterator iter = part.buffer.iterator(comparator); + sortInfo.sortTimeMS.addAndGet(System.currentTimeMillis() - startMS); + + int count = 0; + while ((spare = iter.next()) != null) { + assert spare.length <= Short.MAX_VALUE; + out.write(spare); + count++; + } + + assert count == part.count; + + CodecUtil.writeFooter(out.out); + part.buffer.clear(); + if (partitionsInRAM != null) { + partitionsInRAM.release(); + } + + return new Partition(tempFile.getName(), part.count); + } + } + } + + /** Merges multiple file-based partitions to a single on-disk partition. */ + private class MergePartitionsTask implements Callable { + private final Directory dir; + private final List> segmentsToMerge; + + public MergePartitionsTask(Directory dir, List> segmentsToMerge) { + this.dir = dir; + this.segmentsToMerge = segmentsToMerge; + } + + @Override + public Partition call() throws IOException, InterruptedException, ExecutionException { + long totalCount = 0; + for (Future segment : segmentsToMerge) { + totalCount += segment.get().count; + } + + PriorityQueue queue = new PriorityQueue(segmentsToMerge.size()) { + @Override + protected boolean lessThan(FileAndTop a, FileAndTop b) { + return comparator.compare(a.current, b.current) < 0; + } + }; + + ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()]; + + String newSegmentName = null; + + long startMS = System.currentTimeMillis(); + try (ByteSequencesWriter writer = getWriter(dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) { + + newSegmentName = writer.out.getName(); + + // Open streams and read the top for each file + for (int i = 0; i < segmentsToMerge.size(); i++) { + Partition segment = segmentsToMerge.get(i).get(); + streams[i] = getReader(dir.openChecksumInput(segment.fileName, IOContext.READONCE), segment.fileName); + + BytesRef item = null; + try { + item = streams[i].next(); + } catch (Throwable t) { + verifyChecksum(t, streams[i]); + } + assert item != null; + queue.insertWithOverflow(new FileAndTop(i, item)); + } + + // Unix utility sort() uses ordered array of files to pick the next line from, updating + // it as it reads new lines. The PQ used here is a more elegant solution and has + // a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway + // so it shouldn't make much of a difference (didn't check). + FileAndTop top; + while ((top = queue.top()) != null) { + writer.write(top.current); + try { + top.current = streams[top.fd].next(); + } catch (Throwable t) { + verifyChecksum(t, streams[top.fd]); + } + + if (top.current != null) { + queue.updateTop(); + } else { + queue.pop(); + } + } + + CodecUtil.writeFooter(writer.out); + + for(ByteSequencesReader reader : streams) { + CodecUtil.checkFooter(reader.in); + } + + sortInfo.mergeTimeMS.addAndGet(System.currentTimeMillis() - startMS); + } finally { + IOUtils.close(streams); + } + List toDelete = new ArrayList<>(); + for (Future segment : segmentsToMerge) { + toDelete.add(segment.get().fileName); + } + IOUtils.deleteFiles(dir, toDelete); + + return new Partition(newSegmentName, totalCount); } } } diff --git a/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java b/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java new file mode 100644 index 00000000000..169b9f83bb3 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** An {@code ExecutorService} that executes tasks immediately in the calling thread during submit. */ +class SameThreadExecutorService extends AbstractExecutorService { + private volatile boolean shutdown; + + @Override + public void execute(Runnable command) { + checkShutdown(); + command.run(); + } + + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public void shutdown() { + this.shutdown = true; + } + + @Override + public boolean isTerminated() { + // Simplified: we don't check for any threads hanging in execute (we could + // introduce an atomic counter, but there seems to be no point). + return shutdown == true; + } + + @Override + public boolean isShutdown() { + return shutdown == true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + // See comment in isTerminated(); + return true; + } + + private void checkShutdown() { + if (shutdown) { + throw new RejectedExecutionException("Executor is shut down."); + } + } +} diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java index 8a2356b0106..1575a5b28be 100644 --- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java +++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java @@ -884,7 +884,7 @@ public class BKDWriter implements Closeable { }; } - OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) { + OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) { /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */ @Override diff --git a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java index 839f1037204..68ac0a285c4 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java @@ -24,6 +24,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.codecs.CodecUtil; @@ -73,11 +77,25 @@ public class TestOfflineSorter extends LuceneTestCase { } } + private ExecutorService randomExecutorServiceOrNull() { + if (random().nextBoolean()) { + return null; + } else { + return new ThreadPoolExecutor(1, TestUtil.nextInt(random(), 2, 6), Long.MAX_VALUE, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestIndexSearcher")); + } + } + public void testIntermediateMerges() throws Exception { // Sort 20 mb worth of data with 1mb buffer, binary merging. try (Directory dir = newDirectory()) { - SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1), + ExecutorService exec = randomExecutorServiceOrNull(); + SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1, exec, TestUtil.nextInt(random(), 1, 4)), generateRandom((int)OfflineSorter.MB * 20)); + if (exec != null) { + exec.shutdownNow(); + } assertTrue(info.mergeRounds > 10); } } @@ -85,8 +103,12 @@ public class TestOfflineSorter extends LuceneTestCase { public void testSmallRandom() throws Exception { // Sort 20 mb worth of data with 1mb buffer. try (Directory dir = newDirectory()) { - SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1), + ExecutorService exec = randomExecutorServiceOrNull(); + SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)), generateRandom((int)OfflineSorter.MB * 20)); + if (exec != null) { + exec.shutdownNow(); + } assertEquals(3, sortInfo.mergeRounds); } } @@ -95,8 +117,12 @@ public class TestOfflineSorter extends LuceneTestCase { public void testLargerRandom() throws Exception { // Sort 100MB worth of data with 15mb buffer. try (Directory dir = newFSDirectory(createTempDir())) { - checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1), + ExecutorService exec = randomExecutorServiceOrNull(); + checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)), generateRandom((int)OfflineSorter.MB * 100)); + if (exec != null) { + exec.shutdownNow(); + } } } @@ -358,7 +384,7 @@ public class TestOfflineSorter extends LuceneTestCase { writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> { - new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName()); + new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName()); }); assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); } @@ -408,7 +434,7 @@ public class TestOfflineSorter extends LuceneTestCase { writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); EOFException e = expectThrows(EOFException.class, () -> { - new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName()); + new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName()); }); assertEquals(1, e.getSuppressed().length); assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException); @@ -430,8 +456,12 @@ public class TestOfflineSorter extends LuceneTestCase { CodecUtil.writeFooter(out); } - OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES); + ExecutorService exec = randomExecutorServiceOrNull(); + OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, exec, TestUtil.nextInt(random(), 1, 4)); sorter.sort(out.getName()); + if (exec != null) { + exec.shutdownNow(); + } // 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition: assertEquals(0, sorter.sortInfo.mergeRounds); dir.close(); @@ -448,7 +478,7 @@ public class TestOfflineSorter extends LuceneTestCase { CodecUtil.writeFooter(out); } - OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES); + OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES, null, 0); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { sorter.sort(out.getName()); }); @@ -467,7 +497,7 @@ public class TestOfflineSorter extends LuceneTestCase { CodecUtil.writeFooter(out); } - new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES) { + new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, null, 0) { @Override protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException { ByteSequencesReader other = super.getReader(in, name); @@ -502,13 +532,13 @@ public class TestOfflineSorter extends LuceneTestCase { e = expectThrows(IllegalArgumentException.class, () -> { new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR, - BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0); + BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0, null, 0); }); assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> { new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR, - BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE); + BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE, null, 0); }); assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage()); }