Minor cleanup to ExportWriter

This commit is contained in:
Jason Gerlowski 2019-02-21 15:52:32 -05:00
parent f5a4159d75
commit 5ab5ba773a
1 changed files with 75 additions and 56 deletions

View File

@ -69,7 +69,21 @@ import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.solr.common.util.Utils.makeMap;
/**
* Prepares and writes the documents requested by /export requests
*
* {@link ExportWriter} gathers and sorts the documents for a core using "stream sorting".
* <p>
* Stream sorting works by repeatedly processing and modifying a bitmap of matching documents. Each pass over the
* bitmap identifies the smallest {@link #DOCUMENT_BATCH_SIZE} docs that haven't been sent yet and stores them in a
* Priority Queue. They are then exported (written across the wire) and marked as sent (unset in the bitmap).
* This process repeats until all matching documents have been sent.
* <p>
* This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
* once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
*/
public class ExportWriter implements SolrCore.RawWriter, Closeable {
private static final int DOCUMENT_BATCH_SIZE = 30000;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private OutputStreamWriter respWriter;
final SolrQueryRequest req;
@ -211,72 +225,77 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
queue.reset();
SortDoc top = queue.top();
for (int i = 0; i < leaves.size(); i++) {
sortDoc.setNextReader(leaves.get(i));
DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
int docId;
while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
sortDoc.setValues(docId);
if (top.lessThan(sortDoc)) {
top.setValues(sortDoc);
top = queue.updateTop();
}
}
}
}
protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
destinationArr[++outDocsIndex] = s;
}
}
return outDocsIndex;
}
protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
try {
for (int i = outDocsIndex; i >= 0; --i) {
SortDoc s = docsToExport[i];
writer.add((MapWriter) ew -> {
writeDoc(s, leaves, ew);
s.reset();
});
}
} catch (Throwable e) {
Throwable ex = e;
while (ex != null) {
String m = ex.getMessage();
if (m != null && m.contains("Broken pipe")) {
throw new IgnoreException();
}
ex = ex.getCause();
}
if (e instanceof IOException) {
throw ((IOException) e);
} else {
throw new IOException(e);
}
}
}
protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
//Write the data.
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
int count = 0;
int queueSize = 30000;
if (totalHits < 30000) {
queueSize = totalHits;
}
final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
SortQueue queue = new SortQueue(queueSize, sortDoc);
SortDoc[] outDocs = new SortDoc[queueSize];
while (count < totalHits) {
//long begin = System.nanoTime();
queue.reset();
SortDoc top = queue.top();
for (int i = 0; i < leaves.size(); i++) {
sortDoc.setNextReader(leaves.get(i));
DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
int docId;
while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
sortDoc.setValues(docId);
if (top.lessThan(sortDoc)) {
top.setValues(sortDoc);
top = queue.updateTop();
}
}
}
int outDocsIndex = -1;
for (int i = 0; i < queueSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
outDocs[++outDocsIndex] = s;
}
}
//long end = System.nanoTime();
identifyLowestSortingUnexportedDocs(leaves, sortDoc, queue);
int outDocsIndex = transferBatchToArrayForOutput(queue, outDocs);
count += (outDocsIndex + 1);
try {
for (int i = outDocsIndex; i >= 0; --i) {
SortDoc s = outDocs[i];
writer.add((MapWriter) ew -> {
writeDoc(s, leaves, ew);
s.reset();
});
}
} catch (Throwable e) {
Throwable ex = e;
while (ex != null) {
String m = ex.getMessage();
if (m != null && m.contains("Broken pipe")) {
throw new IgnoreException();
}
ex = ex.getCause();
}
if (e instanceof IOException) {
throw ((IOException) e);
} else {
throw new IOException(e);
}
}
addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
}
}