diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java index 5dff5299503..2c1ab96598a 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java @@ -69,7 +69,21 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.solr.common.util.Utils.makeMap; +/** + * Prepares and writes the documents requested by /export requests + * + * {@link ExportWriter} gathers and sorts the documents for a core using "stream sorting". + *

+ * Stream sorting works by repeatedly processing and modifying a bitmap of matching documents. Each pass over the + * bitmap identifies the smallest {@link #DOCUMENT_BATCH_SIZE} docs that haven't been sent yet and stores them in a + * Priority Queue. They are then exported (written across the wire) and marked as sent (unset in the bitmap). + * This process repeats until all matching documents have been sent. + *

+ * This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at + * once), and it allows {@link ExportWriter} to scale well with regard to numDocs. + */ public class ExportWriter implements SolrCore.RawWriter, Closeable { + private static final int DOCUMENT_BATCH_SIZE = 30000; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private OutputStreamWriter respWriter; final SolrQueryRequest req; @@ -211,72 +225,77 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { } + protected void identifyLowestSortingUnexportedDocs(List leaves, SortDoc sortDoc, SortQueue queue) throws IOException { + queue.reset(); + SortDoc top = queue.top(); + for (int i = 0; i < leaves.size(); i++) { + sortDoc.setNextReader(leaves.get(i)); + DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here + int docId; + while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + sortDoc.setValues(docId); + if (top.lessThan(sortDoc)) { + top.setValues(sortDoc); + top = queue.updateTop(); + } + } + } + } + + protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) { + int outDocsIndex = -1; + for (int i = 0; i < queue.maxSize; i++) { + SortDoc s = queue.pop(); + if (s.docId > -1) { + destinationArr[++outDocsIndex] = s; + } + } + + return outDocsIndex; + } + + protected void addDocsToItemWriter(List leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException { + try { + for (int i = outDocsIndex; i >= 0; --i) { + SortDoc s = docsToExport[i]; + writer.add((MapWriter) ew -> { + writeDoc(s, leaves, ew); + s.reset(); + }); + } + } catch (Throwable e) { + Throwable ex = e; + while (ex != null) { + String m = ex.getMessage(); + if (m != null && m.contains("Broken pipe")) { + throw new IgnoreException(); + } + ex = ex.getCause(); + } + + if (e instanceof IOException) { + throw ((IOException) e); + } else { + throw new IOException(e); + } + } + } + protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException { - //Write the data. List leaves = req.getSearcher().getTopReaderContext().leaves(); SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort()); int count = 0; - int queueSize = 30000; - if (totalHits < 30000) { - queueSize = totalHits; - } + final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits); + SortQueue queue = new SortQueue(queueSize, sortDoc); SortDoc[] outDocs = new SortDoc[queueSize]; while (count < totalHits) { - //long begin = System.nanoTime(); - queue.reset(); - SortDoc top = queue.top(); - for (int i = 0; i < leaves.size(); i++) { - sortDoc.setNextReader(leaves.get(i)); - DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here - int docId; - while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - sortDoc.setValues(docId); - if (top.lessThan(sortDoc)) { - top.setValues(sortDoc); - top = queue.updateTop(); - } - } - } - - int outDocsIndex = -1; - - for (int i = 0; i < queueSize; i++) { - SortDoc s = queue.pop(); - if (s.docId > -1) { - outDocs[++outDocsIndex] = s; - } - } - - //long end = System.nanoTime(); + identifyLowestSortingUnexportedDocs(leaves, sortDoc, queue); + int outDocsIndex = transferBatchToArrayForOutput(queue, outDocs); count += (outDocsIndex + 1); - - try { - for (int i = outDocsIndex; i >= 0; --i) { - SortDoc s = outDocs[i]; - writer.add((MapWriter) ew -> { - writeDoc(s, leaves, ew); - s.reset(); - }); - } - } catch (Throwable e) { - Throwable ex = e; - while (ex != null) { - String m = ex.getMessage(); - if (m != null && m.contains("Broken pipe")) { - throw new IgnoreException(); - } - ex = ex.getCause(); - } - - if (e instanceof IOException) { - throw ((IOException) e); - } else { - throw new IOException(e); - } - } + addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex); } }