SOLR-14608: Faster sorting for the /export handler

This commit is contained in:
Joel Bernstein 2021-01-19 14:08:38 -05:00
parent 1aeb1dcb86
commit 64df5a65f0
24 changed files with 740 additions and 299 deletions

View File

@ -19,45 +19,17 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.common.MapWriter;
import org.apache.solr.schema.FieldType;
class BoolFieldWriter extends FieldWriter {
private String field;
private FieldType fieldType;
private CharsRefBuilder cref = new CharsRefBuilder();
class BoolFieldWriter extends StringFieldWriter {
public BoolFieldWriter(String field, FieldType fieldType) {
this.field = field;
this.fieldType = fieldType;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
BytesRef ref;
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
if (sortValue.isPresent()) {
ref = (BytesRef) sortValue.getCurrentValue();
} else { //empty-value
return false;
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
SortedDocValues vals = DocValues.getSorted(reader, this.field);
if (vals.advance(sortDoc.docId) != sortDoc.docId) {
return false;
}
int ord = vals.ordValue();
ref = vals.lookupOrd(ord);
super(field, fieldType);
}
protected void writeBytes(MapWriter.EntryWriter ew, BytesRef ref, FieldType fieldType) throws IOException {
fieldType.indexedToReadable(ref, cref);
ew.put(this.field, "true".equals(cref.toString()));
return true;
}
}

View File

@ -20,19 +20,22 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import java.util.Date;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.solr.common.MapWriter;
class DateFieldWriter extends FieldWriter {
private String field;
private IntObjectHashMap<NumericDocValues> docValuesCache = new IntObjectHashMap<>();
public DateFieldWriter(String field) {
this.field = field;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
Long val;
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
@ -43,7 +46,21 @@ class DateFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
NumericDocValues vals = DocValues.getNumeric(reader, this.field);
int readerOrd = readerContext.ord;
NumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
NumericDocValues numericDocValues = docValuesCache.get(readerOrd);
if(numericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = numericDocValues;
}
}
if(vals == null) {
vals = DocValues.getNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) == sortDoc.docId) {
val = vals.longValue();
} else {

View File

@ -19,19 +19,21 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.solr.common.MapWriter;
class DoubleFieldWriter extends FieldWriter {
private String field;
private IntObjectHashMap<NumericDocValues> docValuesCache = new IntObjectHashMap<>();
public DoubleFieldWriter(String field) {
this.field = field;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
if (sortValue.isPresent()) {
@ -43,7 +45,20 @@ class DoubleFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
NumericDocValues vals = DocValues.getNumeric(reader, this.field);
int readerOrd = readerContext.ord;
NumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
NumericDocValues numericDocValues = docValuesCache.get(readerOrd);
if(numericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = numericDocValues;
}
}
if(vals == null) {
vals = DocValues.getNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) == sortDoc.docId) {
long val = vals.longValue();
ew.put(this.field, Double.longBitsToDouble(val));

View File

@ -78,13 +78,17 @@ class DoubleValue implements SortValue {
}
}
public void toGlobalValue(SortValue previousValue) {
}
@Override
public boolean isPresent() {
return present;
}
public void setCurrentValue(SortValue sv) {
DoubleValue dv = (DoubleValue)sv;
DoubleValue dv = (DoubleValue) sv;
this.currentValue = dv.currentValue;
this.present = dv.present;
}
@ -95,7 +99,7 @@ class DoubleValue implements SortValue {
}
public int compareTo(SortValue o) {
DoubleValue dv = (DoubleValue)o;
DoubleValue dv = (DoubleValue) o;
return comp.compare(currentValue, dv.currentValue);
}
}

View File

@ -25,6 +25,7 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
protected SortValue value2;
@Override
public SortValue getSortValue(String field) {
if (value1.getField().equals(field)) {
return value1;
@ -34,6 +35,7 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
return null;
}
@Override
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
@ -41,6 +43,7 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
value2.setNextReader(context);
}
@Override
public void reset() {
this.docId = -1;
this.docBase = -1;
@ -49,18 +52,27 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
value2.reset();
}
@Override
public void setValues(int docId) throws IOException {
this.docId = docId;
value1.setCurrentValue(docId);
value2.setCurrentValue(docId);
}
@Override
public void setGlobalValues(SortDoc previous) {
DoubleValueSortDoc doubleValueSortDoc = (DoubleValueSortDoc) previous;
value1.toGlobalValue(doubleValueSortDoc.value1);
value2.toGlobalValue(doubleValueSortDoc.value2);
}
@Override
public void setValues(SortDoc sortDoc) {
this.docId = sortDoc.docId;
this.ord = sortDoc.ord;
this.docBase = sortDoc.docBase;
value1.setCurrentValue(((DoubleValueSortDoc)sortDoc).value1);
value2.setCurrentValue(((DoubleValueSortDoc)sortDoc).value2);
value1.setCurrentValue(((DoubleValueSortDoc) sortDoc).value1);
value2.setCurrentValue(((DoubleValueSortDoc) sortDoc).value2);
}
public DoubleValueSortDoc(SortValue value1, SortValue value2) {
@ -68,34 +80,42 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
this.value2 = value2;
}
@Override
public SortDoc copy() {
return new DoubleValueSortDoc(value1.copy(), value2.copy());
}
@Override
public boolean lessThan(Object o) {
DoubleValueSortDoc sd = (DoubleValueSortDoc)o;
DoubleValueSortDoc sd = (DoubleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value2.compareTo(sd.value2);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
return docId+docBase > sd.docId+sd.docBase;
return docId + docBase > sd.docId + sd.docBase;
}
}
}
public int compareTo(Object o) {
DoubleValueSortDoc sd = (DoubleValueSortDoc)o;
@Override
public int compareTo(SortDoc o) {
DoubleValueSortDoc sd = (DoubleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if (comp == 0) {
return value2.compareTo(sd.value2);
comp = value2.compareTo(sd.value2);
if (comp == 0) {
return (sd.docId + sd.docBase) - (docId + docBase);
} else {
return comp;
}
} else {
return comp;
}

View File

@ -26,14 +26,16 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.BrokenBarrierException;
import com.codahale.metrics.Timer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.handler.export.ExportWriter.MergeIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,9 +53,6 @@ class ExportBuffers {
final List<LeafReaderContext> leaves;
final ExportWriter exportWriter;
final OutputStream os;
final Timer writeOutputBufferTimer;
final Timer fillerWaitTimer;
final Timer writerWaitTimer;
final IteratorWriter.ItemWriter rawWriter;
final IteratorWriter.ItemWriter writer;
final CyclicBarrier barrier;
@ -68,7 +67,7 @@ class ExportBuffers {
ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits,
Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) throws IOException {
FixedBitSet[] sets) throws IOException {
this.exportWriter = exportWriter;
this.leaves = leaves;
this.os = os;
@ -81,55 +80,62 @@ class ExportBuffers {
return this;
}
};
this.writeOutputBufferTimer = writeOutputBufferTimer;
this.fillerWaitTimer = fillerWaitTimer;
this.writerWaitTimer = writerWaitTimer;
this.bufferOne = new Buffer(queueSize);
this.bufferTwo = new Buffer(queueSize);
this.totalHits = totalHits;
fillBuffer = bufferOne;
outputBuffer = bufferTwo;
SortDoc writerSortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
MergeIterator mergeIterator = exportWriter.getMergeIterator(leaves, sets, writerSortDoc);
bufferOne.initialize(writerSortDoc);
bufferTwo.initialize(writerSortDoc);
barrier = new CyclicBarrier(2, () -> swapBuffers());
filler = () -> {
try {
// log.debug("--- filler start {}", Thread.currentThread());
SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
Buffer buffer = getFillBuffer();
SortQueue queue = new SortQueue(queueSize, sortDoc);
long lastOutputCounter = 0;
for (int count = 0; count < totalHits; ) {
// log.debug("--- filler fillOutDocs in {}", fillBuffer);
exportWriter.fillOutDocs(leaves, sortDoc, queue, buffer);
exportWriter.fillOutDocs(mergeIterator, buffer);
count += (buffer.outDocsIndex + 1);
// log.debug("--- filler count={}, exchange buffer from {}", count, buffer);
Timer.Context timerContext = getFillerWaitTimer().time();
try {
long startBufferWait = System.nanoTime();
exchangeBuffers();
} finally {
timerContext.stop();
long endBufferWait = System.nanoTime();
if(log.isDebugEnabled()) {
log.debug("Waited for writer thread:{}", Long.toString(((endBufferWait - startBufferWait) / 1000000)));
}
} finally {
}
buffer = getFillBuffer();
if (outputCounter.longValue() > lastOutputCounter) {
lastOutputCounter = outputCounter.longValue();
flushOutput();
}
// log.debug("--- filler got empty buffer {}", buffer);
}
buffer.outDocsIndex = Buffer.NO_MORE_DOCS;
// log.debug("--- filler final exchange buffer from {}", buffer);
Timer.Context timerContext = getFillerWaitTimer().time();
try {
exchangeBuffers();
} finally {
timerContext.stop();
}
buffer = getFillBuffer();
// log.debug("--- filler final got buffer {}", buffer);
} catch (Throwable e) {
if(!(e instanceof InterruptedException) && !(e instanceof BrokenBarrierException)) {
/*
Don't log the interrupt or BrokenBarrierException as it creates noise during early client disconnects and
doesn't log anything particularly useful in other situations.
*/
log.error("filler", e);
}
error(e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@ -155,7 +161,7 @@ class ExportBuffers {
}
private void swapBuffers() {
log.debug("--- swap buffers");
//log.debug("--- swap buffers");
Buffer one = fillBuffer;
fillBuffer = outputBuffer;
outputBuffer = one;
@ -174,18 +180,6 @@ class ExportBuffers {
return fillBuffer;
}
public Timer getWriteOutputBufferTimer() {
return writeOutputBufferTimer;
}
public Timer getFillerWaitTimer() {
return fillerWaitTimer;
}
public Timer getWriterWaitTimer() {
return writerWaitTimer;
}
// decorated writer that keeps track of number of writes
public IteratorWriter.ItemWriter getWriter() {
return writer;
@ -230,8 +224,24 @@ class ExportBuffers {
// );
// allDone.join();
log.debug("-- finished.");
} catch (Exception e) {
} catch (Throwable e) {
Throwable ex = e;
boolean ignore = false;
while (ex != null) {
String m = ex.getMessage();
if (m != null && m.contains("Broken pipe")) {
ignore = true;
break;
}
ex = ex.getCause();
}
if(!ignore) {
/*
Ignore Broken pipes. Broken pipes occur normally when using the export handler for
merge joins when the join is complete before both sides of the join are fully read.
*/
log.error("Exception running filler / writer", e);
}
error(e);
//
} finally {

View File

@ -25,8 +25,8 @@ import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.TreeSet;
import com.codahale.metrics.Timer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
@ -90,15 +90,16 @@ import static org.apache.solr.common.util.Utils.makeMap;
* bitmap identifies the smallest docs (default is {@link #DEFAULT_BATCH_SIZE}) that haven't been sent yet and stores them in a
* Priority Queue. They are then exported (written across the wire) and marked as sent (unset in the bitmap).
* This process repeats until all matching documents have been sent.
* <p>
* This streaming approach is light on memory (only up to 2x batch size documents are ever stored in memory at
* once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
*/
public class ExportWriter implements SolrCore.RawWriter, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String BATCH_SIZE_PARAM = "batchSize";
public static final String QUEUE_SIZE_PARAM = "queueSize";
public static final int DEFAULT_BATCH_SIZE = 30000;
public static final int DEFAULT_QUEUE_SIZE = 150000;
private OutputStreamWriter respWriter;
final SolrQueryRequest req;
@ -106,7 +107,10 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
final StreamContext initialStreamContext;
final SolrMetricsContext solrMetricsContext;
final String metricsPath;
//The batch size for the output writer thread.
final int batchSize;
//The max combined size of the segment level priority queues.
private int priorityQueueSize;
StreamExpression streamExpression;
StreamContext streamContext;
FieldWriter[] fieldWriters;
@ -114,11 +118,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
FixedBitSet[] sets = null;
PushWriter writer;
private String wt;
final Timer identifyLowestSortingDocTimer;
final Timer transferBatchToBufferTimer;
final Timer writeOutputBufferTimer;
final Timer writerWaitTimer;
final Timer fillerWaitTimer;
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
@ -130,12 +130,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
this.initialStreamContext = initialStreamContext;
this.solrMetricsContext = solrMetricsContext;
this.metricsPath = metricsPath;
this.batchSize = req.getParams().getInt(BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
identifyLowestSortingDocTimer = solrMetricsContext.timer("identifyLowestSortingDoc", metricsPath);
transferBatchToBufferTimer = solrMetricsContext.timer("transferBatchToBuffer", metricsPath);
writeOutputBufferTimer = solrMetricsContext.timer("writeOutputBuffer", metricsPath);
writerWaitTimer = solrMetricsContext.timer("writerWaitTimer", metricsPath);
fillerWaitTimer = solrMetricsContext.timer("fillerWaitTimer", metricsPath);
this.priorityQueueSize = req.getParams().getInt(QUEUE_SIZE_PARAM, DEFAULT_QUEUE_SIZE);
this.batchSize = DEFAULT_BATCH_SIZE;
}
@Override
@ -147,10 +143,20 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
@Override
public void close() throws IOException {
if (writer != null) writer.close();
if (writer != null) {
try {
writer.close();
} catch (Throwable t) {
//We're going to sit on this.
}
}
if (respWriter != null) {
try {
respWriter.flush();
respWriter.close();
} catch (Throwable t) {
}
}
}
@ -168,6 +174,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
public void write(OutputStream os) throws IOException {
try {
_write(os);
} finally {
}
}
private void _write(OutputStream os) throws IOException {
QueryResponseWriter rw = req.getCore().getResponseWriters().get(wt);
if (rw instanceof BinaryResponseWriter) {
//todo add support for other writers after testing
@ -281,6 +295,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
streamContext.put(CommonParams.SORT, params.get(CommonParams.SORT));
}
try {
writer.writeMap(m -> {
m.put("responseHeader", singletonMap("status", 0));
m.put("response", (MapWriter) mw -> {
@ -288,6 +303,10 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
});
});
} catch (java.io.EOFException e) {
log.info("Caught Eof likely caused by early client disconnect");
}
if (streamContext != null) {
streamContext = null;
}
@ -302,41 +321,16 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return tupleStream;
}
private void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
Timer.Context timerContext = identifyLowestSortingDocTimer.time();
try {
queue.reset();
SortDoc top = queue.top();
for (int i = 0; i < leaves.size(); i++) {
sortDoc.setNextReader(leaves.get(i));
DocIdSetIterator it = new BitSetIterator(sets[i], 0); // cost is not useful here
int docId;
while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
sortDoc.setValues(docId);
if (top.lessThan(sortDoc)) {
top.setValues(sortDoc);
top = queue.updateTop();
}
}
}
} finally {
timerContext.stop();
}
}
private void transferBatchToBufferForOutput(SortQueue queue,
List<LeafReaderContext> leaves,
private void transferBatchToBufferForOutput(MergeIterator mergeIterator,
ExportBuffers.Buffer destination) throws IOException {
Timer.Context timerContext = transferBatchToBufferTimer.time();
try {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
destination.outDocs[++outDocsIndex].setValues(s);
// remove this doc id from the matching bitset, it's been exported
sets[s.ord].clear(s.docId);
s.reset(); // reuse
for (int i = 0; i < batchSize; i++) {
SortDoc sortDoc = mergeIterator.next();
if (sortDoc != null) {
destination.outDocs[++outDocsIndex].setValues(sortDoc);
} else {
break;
}
}
destination.outDocsIndex = outDocsIndex;
@ -347,7 +341,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
throw t;
} finally {
timerContext.stop();
}
}
@ -355,8 +349,17 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
final int queueSize = Math.min(batchSize, totalHits);
ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), os, writer, sort, queueSize, totalHits,
writeOutputBufferTimer, fillerWaitTimer, writerWaitTimer);
ExportBuffers buffers = new ExportBuffers(this,
leaves,
req.getSearcher(),
os,
writer,
sort,
queueSize,
totalHits,
sets);
if (streamExpression != null) {
streamContext.put(ExportBuffers.EXPORT_BUFFERS_KEY, buffers);
@ -408,9 +411,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
log.debug("--- writer interrupted");
break;
}
Timer.Context timerContext = writeOutputBufferTimer.time();
try {
for (int i = buffer.outDocsIndex; i >= 0; --i) {
for (int i = 0; i <= buffer.outDocsIndex; ++i) {
// we're using the raw writer here because there's no potential
// reduction in the number of output items, unlike when using
// streaming expressions
@ -418,27 +420,28 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
}
} finally {
timerContext.stop();
}
log.debug("--- writer exchanging from {}", buffer);
timerContext = writerWaitTimer.time();
//log.debug("--- writer exchanging from {}", buffer);
try {
long startExchangeBuffers = System.nanoTime();
buffers.exchangeBuffers();
long endExchangeBuffers = System.nanoTime();
if (log.isDebugEnabled()) {
log.debug("Waited for reader thread {}:", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
}
} finally {
timerContext.stop();
}
buffer = buffers.getOutputBuffer();
log.debug("--- writer got {}", buffer);
//log.debug("--- writer got {}", buffer);
}
return true;
});
}
}
void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
SortQueue sortQueue, ExportBuffers.Buffer buffer) throws IOException {
identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
transferBatchToBufferForOutput(sortQueue, leaves, buffer);
void fillOutDocs(MergeIterator mergeIterator,
ExportBuffers.Buffer buffer) throws IOException {
transferBatchToBufferForOutput(mergeIterator, buffer);
}
void writeDoc(SortDoc sortDoc,
@ -448,7 +451,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
LeafReaderContext context = leaves.get(ord);
int fieldIndex = 0;
for (FieldWriter fieldWriter : writers) {
if (fieldWriter.write(sortDoc, context.reader(), ew, fieldIndex)) {
if (fieldWriter.write(sortDoc, context, ew, fieldIndex)) {
++fieldIndex;
}
}
@ -612,6 +615,183 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return new SortDoc(sortValues);
}
static class MergeIterator {
private TreeSet<SortDoc> set = new TreeSet<>();
private SegmentIterator[] segmentIterators;
private SortDoc outDoc;
public MergeIterator(SegmentIterator[] segmentIterators, SortDoc proto) throws IOException {
outDoc = proto.copy();
this.segmentIterators = segmentIterators;
for (int i = 0; i < segmentIterators.length; i++) {
try {
SortDoc sortDoc = segmentIterators[i].next();
if (sortDoc != null) {
set.add(sortDoc);
}
} catch (IOException e) {
log.error("Error in MergeIterator: ", e);
throw e;
}
}
}
/*
* Merge sorts the SortDocs from Segment Iterators
* Returns null when all docs are iterated.
*/
public SortDoc next() throws IOException {
SortDoc sortDoc = set.pollLast();
//We've exhausted all documents
if (sortDoc == null) {
return null;
} else {
outDoc.setValues(sortDoc);
}
SortDoc nextDoc = segmentIterators[sortDoc.ord].next();
if (nextDoc != null) {
//The entire expense of the operation is here
set.add(nextDoc);
}
return outDoc;
}
}
public MergeIterator getMergeIterator(List<LeafReaderContext> leaves, FixedBitSet[] bits, SortDoc sortDoc) throws IOException {
try {
long totalDocs = 0;
for (int i = 0; i < leaves.size(); i++) {
totalDocs += leaves.get(i).reader().maxDoc();
}
//Resize the priorityQueueSize down for small result sets.
this.priorityQueueSize = Math.min(this.priorityQueueSize, (int)(this.totalHits*1.5));
if(log.isDebugEnabled()) {
log.debug("Total priority queue size {}:", this.priorityQueueSize);
}
int[] sizes = new int[leaves.size()];
int combineQueueSize = 0;
for (int i = 0; i < leaves.size(); i++) {
long maxDoc = leaves.get(i).reader().maxDoc();
int sortQueueSize = Math.min((int) (((double) maxDoc / (double) totalDocs) * this.priorityQueueSize), batchSize);
//Protect against too small a queue size as well
if(sortQueueSize < 10) {
sortQueueSize = 10;
}
if(log.isDebugEnabled()) {
log.debug("Segment priority queue size {}:", sortQueueSize);
}
sizes[i] = sortQueueSize;
combineQueueSize += sortQueueSize;
}
if(log.isDebugEnabled()) {
log.debug("Combined priority queue size {}:", combineQueueSize);
}
SegmentIterator[] segmentIterators = new SegmentIterator[leaves.size()];
for (int i = 0; i < segmentIterators.length; i++) {
SortQueue sortQueue = new SortQueue(sizes[i], sortDoc.copy());
segmentIterators[i] = new SegmentIterator(bits[i], leaves.get(i), sortQueue, sortDoc.copy());
}
return new MergeIterator(segmentIterators, sortDoc);
} finally {
}
}
private static class SegmentIterator {
private final FixedBitSet bits;
private final SortQueue queue;
private final SortDoc sortDoc;
private final LeafReaderContext context;
private final SortDoc[] outDocs;
private SortDoc nextDoc;
private int index;
public SegmentIterator(FixedBitSet bits, LeafReaderContext context, SortQueue sortQueue, SortDoc sortDoc) throws IOException {
this.bits = bits;
this.queue = sortQueue;
this.sortDoc = sortDoc;
this.nextDoc = sortDoc.copy();
this.context = context;
this.outDocs = new SortDoc[sortQueue.maxSize];
topDocs();
}
public SortDoc next() throws IOException {
SortDoc _sortDoc = null;
if (index > -1) {
_sortDoc = outDocs[index--];
} else {
topDocs();
if (index > -1) {
_sortDoc = outDocs[index--];
}
}
if (_sortDoc != null) {
//Clear the bit so it's not loaded again.
bits.clear(_sortDoc.docId);
//Load the global ordinal (only matters for strings)
_sortDoc.setGlobalValues(nextDoc);
nextDoc.setValues(_sortDoc);
//We are now done with this doc.
_sortDoc.reset();
} else {
nextDoc = null;
}
return nextDoc;
}
private void topDocs() throws IOException {
try {
queue.reset();
SortDoc top = queue.top();
this.sortDoc.setNextReader(context);
DocIdSetIterator it = new BitSetIterator(bits, 0); // cost is not useful here
int docId;
while ((docId = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
this.sortDoc.setValues(docId);
if (top.lessThan(this.sortDoc)) {
top.setValues(this.sortDoc);
top = queue.updateTop();
}
}
//Pop the queue and load up the array.
index = -1;
SortDoc _sortDoc;
while ((_sortDoc = queue.pop()) != null) {
if (_sortDoc.docId > -1) {
outDocs[++index] = _sortDoc;
}
}
} catch (Exception e) {
log.error("Segment Iterator Error:", e);
throw new IOException(e);
} finally {
}
}
}
public static class IgnoreException extends IOException {
public void printStackTrace(PrintWriter pw) {
pw.print("Early Client Disconnect");
@ -621,5 +801,4 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return "Early Client Disconnect";
}
}
}

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -56,9 +55,9 @@ public class ExportWriterStream extends TupleStream implements Expressible {
StreamContext context;
StreamComparator streamComparator;
int pos = -1;
int index = -1;
ExportBuffers exportBuffers;
ExportBuffers.Buffer buffer;
Timer.Context writeOutputTimerContext;
private static final class TupleEntryWriter implements EntryWriter {
Tuple tuple;
@ -131,9 +130,7 @@ public class ExportWriterStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
}
exportBuffers = null;
}
@ -141,18 +138,19 @@ public class ExportWriterStream extends TupleStream implements Expressible {
public Tuple read() throws IOException {
Tuple res = null;
if (pos < 0) {
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
writeOutputTimerContext = null;
}
try {
buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
log.debug("--- ews exchange empty buffer {}", buffer);
//log.debug("--- ews exchange empty buffer {}", buffer);
boolean exchanged = false;
while (!exchanged) {
Timer.Context timerContext = exportBuffers.getWriterWaitTimer().time();
try {
long startExchangeBuffers = System.nanoTime();
exportBuffers.exchangeBuffers();
long endExchangeBuffers = System.nanoTime();
if(log.isDebugEnabled()) {
log.debug("Waited for reader thread:{}", Long.toString(((endExchangeBuffers - startExchangeBuffers) / 1000000)));
}
exchanged = true;
} catch (TimeoutException e) {
log.debug("--- ews timeout loop");
@ -175,7 +173,6 @@ public class ExportWriterStream extends TupleStream implements Expressible {
}
break;
} finally {
timerContext.stop();
}
}
} catch (InterruptedException e) {
@ -196,6 +193,7 @@ public class ExportWriterStream extends TupleStream implements Expressible {
res = Tuple.EOF();
} else {
pos = buffer.outDocsIndex;
index = -1; //restart index.
log.debug("--- ews new pos={}", pos);
}
}
@ -205,15 +203,11 @@ public class ExportWriterStream extends TupleStream implements Expressible {
}
if (res != null) {
// only errors or EOF assigned result so far
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
}
return res;
}
if (writeOutputTimerContext == null) {
writeOutputTimerContext = exportBuffers.getWriteOutputBufferTimer().time();
}
SortDoc sortDoc = buffer.outDocs[pos];
SortDoc sortDoc = buffer.outDocs[++index];
tupleEntryWriter.tuple = new Tuple();
exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
pos--;

View File

@ -19,9 +19,9 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.solr.common.MapWriter;
abstract class FieldWriter {
public abstract boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter out, int fieldIndex) throws IOException;
public abstract boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter out, int fieldIndex) throws IOException;
}

View File

@ -19,19 +19,21 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.solr.common.MapWriter;
class FloatFieldWriter extends FieldWriter {
private String field;
private IntObjectHashMap<NumericDocValues> docValuesCache = new IntObjectHashMap<>();
public FloatFieldWriter(String field) {
this.field = field;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
if (sortValue.isPresent()) {
@ -43,7 +45,21 @@ class FloatFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
NumericDocValues vals = DocValues.getNumeric(reader, this.field);
int readerOrd = readerContext.ord;
NumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
NumericDocValues numericDocValues = docValuesCache.get(readerOrd);
if(numericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = numericDocValues;
}
}
if(vals == null) {
vals = DocValues.getNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) == sortDoc.docId) {
int val = (int) vals.longValue();
ew.put(this.field, Float.intBitsToFloat(val));

View File

@ -44,6 +44,10 @@ class FloatValue implements SortValue {
return currentValue;
}
public void toGlobalValue(SortValue previousValue) {
}
public String getField() {
return field;
}
@ -81,7 +85,7 @@ class FloatValue implements SortValue {
}
public void setCurrentValue(SortValue sv) {
FloatValue fv = (FloatValue)sv;
FloatValue fv = (FloatValue) sv;
this.currentValue = fv.currentValue;
this.present = fv.present;
}
@ -92,7 +96,7 @@ class FloatValue implements SortValue {
}
public int compareTo(SortValue o) {
FloatValue fv = (FloatValue)o;
FloatValue fv = (FloatValue) o;
return comp.compare(currentValue, fv.currentValue);
}
}

View File

@ -19,19 +19,21 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.solr.common.MapWriter;
class IntFieldWriter extends FieldWriter {
private String field;
private IntObjectHashMap<NumericDocValues> docValuesCache = new IntObjectHashMap<>();
public IntFieldWriter(String field) {
this.field = field;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
int val;
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
@ -42,7 +44,21 @@ class IntFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
NumericDocValues vals = DocValues.getNumeric(reader, this.field);
int readerOrd = readerContext.ord;
NumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
NumericDocValues numericDocValues = docValuesCache.get(readerOrd);
if(numericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = numericDocValues;
}
}
if(vals == null) {
vals = DocValues.getNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) == sortDoc.docId) {
val = (int) vals.longValue();
} else {

View File

@ -75,18 +75,22 @@ public class IntValue implements SortValue {
}
}
public void toGlobalValue(SortValue previousValue) {
}
@Override
public boolean isPresent() {
return this.present;
}
public int compareTo(SortValue o) {
IntValue iv = (IntValue)o;
IntValue iv = (IntValue) o;
return comp.compare(currentValue, iv.currentValue);
}
public void setCurrentValue(SortValue sv) {
IntValue iv = (IntValue)sv;
IntValue iv = (IntValue) sv;
this.currentValue = iv.currentValue;
this.present = iv.present;
}

View File

@ -19,19 +19,22 @@ package org.apache.solr.handler.export;
import java.io.IOException;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.solr.common.MapWriter;
class LongFieldWriter extends FieldWriter {
private String field;
private IntObjectHashMap<NumericDocValues> docValuesCache = new IntObjectHashMap<>();
public LongFieldWriter(String field) {
this.field = field;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
long val;
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
@ -42,7 +45,21 @@ class LongFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
NumericDocValues vals = DocValues.getNumeric(reader, this.field);
int readerOrd = readerContext.ord;
NumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
NumericDocValues numericDocValues = docValuesCache.get(readerOrd);
if(numericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = numericDocValues;
}
}
if(vals == null) {
vals = DocValues.getNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) == sortDoc.docId) {
val = vals.longValue();
} else {

View File

@ -52,6 +52,10 @@ public class LongValue implements SortValue {
return new LongValue(field, comp);
}
public void toGlobalValue(SortValue previousValue) {
}
public void setNextReader(LeafReaderContext context) throws IOException {
this.vals = DocValues.getNumeric(context.reader(), field);
lastDocID = 0;

View File

@ -21,11 +21,8 @@ import java.io.IOException;
import java.util.Date;
import java.util.function.LongFunction;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.NumericUtils;
@ -41,6 +38,8 @@ class MultiFieldWriter extends FieldWriter {
private boolean numeric;
private CharsRefBuilder cref = new CharsRefBuilder();
private final LongFunction<Object> bitsToValue;
private IntObjectHashMap<Object> docValuesCache = new IntObjectHashMap<>();
public MultiFieldWriter(String field, FieldType fieldType, SchemaField schemaField, boolean numeric) {
this.field = field;
@ -54,25 +53,59 @@ class MultiFieldWriter extends FieldWriter {
}
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter out, int fieldIndex) throws IOException {
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter out, int fieldIndex) throws IOException {
if (this.fieldType.isPointField()) {
SortedNumericDocValues vals = DocValues.getSortedNumeric(reader, this.field);
int readerOrd = readerContext.ord;
SortedNumericDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
SortedNumericDocValues sortedNumericDocValues = (SortedNumericDocValues) docValuesCache.get(readerOrd);
if(sortedNumericDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = sortedNumericDocValues;
}
}
if(vals == null) {
vals = DocValues.getSortedNumeric(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (!vals.advanceExact(sortDoc.docId)) return false;
final SortedNumericDocValues docVals = vals;
out.put(this.field,
(IteratorWriter) w -> {
for (int i = 0, count = vals.docValueCount(); i < count; i++) {
w.add(bitsToValue.apply(vals.nextValue()));
for (int i = 0, count = docVals.docValueCount(); i < count; i++) {
w.add(bitsToValue.apply(docVals.nextValue()));
}
});
return true;
} else {
SortedSetDocValues vals = DocValues.getSortedSet(reader, this.field);
int readerOrd = readerContext.ord;
SortedSetDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
SortedSetDocValues sortedSetDocValues = (SortedSetDocValues) docValuesCache.get(readerOrd);
if(sortedSetDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = sortedSetDocValues;
}
}
if(vals == null) {
vals = DocValues.getSortedSet(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) != sortDoc.docId) return false;
final SortedSetDocValues docVals = vals;
out.put(this.field,
(IteratorWriter) w -> {
long o;
while((o = vals.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
BytesRef ref = vals.lookupOrd(o);
while((o = docVals.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
BytesRef ref = docVals.lookupOrd(o);
fieldType.indexedToReadable(ref, cref);
IndexableField f = fieldType.createField(schemaField, cref.toString());
if (f == null) w.add(cref.toString());

View File

@ -25,6 +25,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
protected SortValue value4;
@Override
public SortValue getSortValue(String field) {
if (value1.getField().equals(field)) {
return value1;
@ -38,6 +39,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
return null;
}
@Override
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
@ -47,6 +49,17 @@ class QuadValueSortDoc extends TripleValueSortDoc {
value4.setNextReader(context);
}
@Override
public void setGlobalValues(SortDoc previous) {
QuadValueSortDoc quadValueSortDoc = (QuadValueSortDoc) previous;
value1.toGlobalValue(quadValueSortDoc.value1);
value2.toGlobalValue(quadValueSortDoc.value2);
value3.toGlobalValue(quadValueSortDoc.value3);
value4.toGlobalValue(quadValueSortDoc.value4);
}
@Override
public void reset() {
this.docId = -1;
this.docBase = -1;
@ -57,6 +70,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
value4.reset();
}
@Override
public void setValues(int docId) throws IOException {
this.docId = docId;
value1.setCurrentValue(docId);
@ -65,6 +79,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
value4.setCurrentValue(docId);
}
@Override
public void setValues(SortDoc sortDoc) {
this.docId = sortDoc.docId;
this.ord = sortDoc.ord;
@ -80,53 +95,61 @@ class QuadValueSortDoc extends TripleValueSortDoc {
this.value4 = value4;
}
@Override
public SortDoc copy() {
return new QuadValueSortDoc(value1.copy(), value2.copy(), value3.copy(), value4.copy());
}
@Override
public boolean lessThan(Object o) {
QuadValueSortDoc sd = (QuadValueSortDoc)o;
QuadValueSortDoc sd = (QuadValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value2.compareTo(sd.value2);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value3.compareTo(sd.value3);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value4.compareTo(sd.value4);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
return docId+docBase > sd.docId+sd.docBase;
return docId + docBase > sd.docId + sd.docBase;
}
}
}
}
}
public int compareTo(Object o) {
QuadValueSortDoc sd = (QuadValueSortDoc)o;
@Override
public int compareTo(SortDoc o) {
QuadValueSortDoc sd = (QuadValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if(comp == 0) {
if (comp == 0) {
comp = value2.compareTo(sd.value2);
if(comp == 0) {
if (comp == 0) {
comp = value3.compareTo(sd.value3);
if(comp == 0) {
return value4.compareTo(sd.value4);
if (comp == 0) {
comp = value4.compareTo(sd.value4);
if (comp == 0) {
return (sd.docId + sd.docBase) - (docId + docBase);
} else {
return comp;
}
} else {
return comp;
}

View File

@ -25,6 +25,7 @@ class SingleValueSortDoc extends SortDoc {
protected SortValue value1;
@Override
public SortValue getSortValue(String field) {
if (value1.getField().equals(field)) {
return value1;
@ -32,12 +33,14 @@ class SingleValueSortDoc extends SortDoc {
return null;
}
@Override
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
value1.setNextReader(context);
}
@Override
public void reset() {
this.docId = -1;
this.docBase = -1;
@ -45,16 +48,18 @@ class SingleValueSortDoc extends SortDoc {
this.value1.reset();
}
@Override
public void setValues(int docId) throws IOException {
this.docId = docId;
value1.setCurrentValue(docId);
}
@Override
public void setValues(SortDoc sortDoc) {
this.docId = sortDoc.docId;
this.ord = sortDoc.ord;
this.docBase = sortDoc.docBase;
value1.setCurrentValue(((SingleValueSortDoc)sortDoc).value1);
value1.setCurrentValue(((SingleValueSortDoc) sortDoc).value1);
}
public SingleValueSortDoc(SortValue value1) {
@ -62,25 +67,39 @@ class SingleValueSortDoc extends SortDoc {
this.value1 = value1;
}
@Override
public void setGlobalValues(SortDoc previous) {
SortValue previousValue = ((SingleValueSortDoc) previous).value1;
value1.toGlobalValue(previousValue);
}
@Override
public SortDoc copy() {
return new SingleValueSortDoc(value1.copy());
}
@Override
public boolean lessThan(Object o) {
SingleValueSortDoc sd = (SingleValueSortDoc)o;
SingleValueSortDoc sd = (SingleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
return docId+docBase > sd.docId+sd.docBase;
return docId + docBase > sd.docId + sd.docBase;
}
}
public int compareTo(Object o) {
SingleValueSortDoc sd = (SingleValueSortDoc)o;
return value1.compareTo(sd.value1);
@Override
public int compareTo(SortDoc o) {
SingleValueSortDoc sd = (SingleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if (comp == 0) {
return (sd.docId + sd.docBase) - (docId + docBase);
} else {
return comp;
}
}
public String toString() {

View File

@ -18,10 +18,11 @@
package org.apache.solr.handler.export;
import java.io.IOException;
import java.util.Objects;
import org.apache.lucene.index.LeafReaderContext;
class SortDoc {
class SortDoc implements Comparable<SortDoc> {
protected int docId = -1;
protected int ord = -1;
@ -34,6 +35,21 @@ class SortDoc {
}
public SortDoc() {
}
@Override
public boolean equals(Object obj) {
// subclasses are not equal
if (!obj.getClass().equals(getClass())) {
return false;
}
return compareTo((SortDoc) obj) == 0;
}
@Override
public int hashCode() {
return Objects.hash(docId, ord, docBase);
}
public SortValue getSortValue(String field) {
@ -69,6 +85,13 @@ class SortDoc {
}
}
public void setGlobalValues(SortDoc previous) {
SortValue[] previousValues = previous.sortValues;
for (int i = 0; i < sortValues.length; i++) {
sortValues[i].toGlobalValue(previousValues[i]);
}
}
public void setValues(SortDoc sortDoc) {
this.docId = sortDoc.docId;
this.ord = sortDoc.ord;
@ -84,7 +107,6 @@ class SortDoc {
for (int i = 0; i < sortValues.length; i++) {
svs[i] = sortValues[i].copy();
}
return new SortDoc(svs);
}
@ -92,7 +114,7 @@ class SortDoc {
if (docId == -1) {
return true;
}
SortDoc sd = (SortDoc)o;
SortDoc sd = (SortDoc) o;
SortValue[] sortValues1 = sd.sortValues;
for (int i = 0; i < sortValues.length; i++) {
int comp = sortValues[i].compareTo(sortValues1[i]);
@ -105,18 +127,17 @@ class SortDoc {
return docId + docBase > sd.docId + sd.docBase; //index order
}
public int compareTo(Object o) {
SortDoc sd = (SortDoc)o;
@Override
public int compareTo(SortDoc sd) {
for (int i = 0; i < sortValues.length; i++) {
int comp = sortValues[i].compareTo(sd.sortValues[i]);
if (comp != 0) {
return comp;
}
}
return 0;
return (sd.docId + sd.docBase) - (docId + docBase);
}
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(ord).append(':').append(docBase).append(':').append(docId).append("; ");

View File

@ -21,18 +21,19 @@ import java.io.IOException;
import org.apache.lucene.index.LeafReaderContext;
public interface SortValue extends Comparable<SortValue> {
public void setCurrentValue(int docId) throws IOException;
public void setNextReader(LeafReaderContext context) throws IOException;
public void setCurrentValue(SortValue value);
public void reset();
public SortValue copy();
public Object getCurrentValue() throws IOException;
public String getField();
interface SortValue extends Comparable<SortValue> {
void setCurrentValue(int docId) throws IOException;
void setNextReader(LeafReaderContext context) throws IOException;
void setCurrentValue(SortValue value);
void toGlobalValue(SortValue previousValue);
void reset();
SortValue copy();
Object getCurrentValue() throws IOException;
String getField();
/**
*
* @return true if document has a value for the specified field
*/
public boolean isPresent();
boolean isPresent();
}

View File

@ -18,11 +18,9 @@
package org.apache.solr.handler.export;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.carrotsearch.hppc.IntObjectHashMap;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
@ -32,10 +30,13 @@ import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.schema.FieldType;
class StringFieldWriter extends FieldWriter {
private String field;
protected String field;
private FieldType fieldType;
private Map<Integer, SortedDocValues> lastDocValues = new HashMap<>();
private CharsRefBuilder cref = new CharsRefBuilder();
private BytesRef lastRef;
private int lastOrd = -1;
private IntObjectHashMap<SortedDocValues> docValuesCache = new IntObjectHashMap<>();
protected CharsRefBuilder cref = new CharsRefBuilder();
final ByteArrayUtf8CharSequence utf8 = new ByteArrayUtf8CharSequence(new byte[0], 0, 0) {
@Override
public String toString() {
@ -53,48 +54,69 @@ class StringFieldWriter extends FieldWriter {
this.fieldType = fieldType;
}
public boolean write(SortDoc sortDoc, LeafReader reader, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
BytesRef ref;
SortValue sortValue = sortDoc.getSortValue(this.field);
if (sortValue != null) {
if (sortValue.isPresent()) {
ref = (BytesRef) sortValue.getCurrentValue();
} else { //empty-value
public boolean write(SortDoc sortDoc, LeafReaderContext readerContext, MapWriter.EntryWriter ew, int fieldIndex) throws IOException {
StringValue stringValue = (StringValue) sortDoc.getSortValue(this.field);
BytesRef ref = null;
if (stringValue != null) {
/*
We already have the top level ordinal used for sorting.
Now let's use it for caching the BytesRef so we don't have to look it up.
When we have long runs of repeated values do to the sort order of the docs this is a huge win.
*/
if(stringValue.currentOrd == -1) {
//Null sort value
return false;
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
SortedDocValues vals = lastDocValues.get(sortDoc.ord);
if (vals == null || vals.docID() >= sortDoc.docId) {
vals = DocValues.getSorted(reader, this.field);
lastDocValues.put(sortDoc.ord, vals);
if (this.lastOrd == stringValue.currentOrd) {
ref = lastRef;
}
this.lastOrd = stringValue.currentOrd;
}
if (ref == null) {
//Reuse the last DocValues object if possible
int readerOrd = readerContext.ord;
SortedDocValues vals = null;
if(docValuesCache.containsKey(readerOrd)) {
SortedDocValues sortedDocValues = docValuesCache.get(readerOrd);
if(sortedDocValues.docID() < sortDoc.docId) {
//We have not advanced beyond the current docId so we can use this docValues.
vals = sortedDocValues;
}
}
if(vals == null) {
vals = DocValues.getSorted(readerContext.reader(), this.field);
docValuesCache.put(readerOrd, vals);
}
if (vals.advance(sortDoc.docId) != sortDoc.docId) {
return false;
}
int ord = vals.ordValue();
ref = vals.lookupOrd(ord);
if(stringValue != null) {
//Don't need to set the lastRef if it's not a sort value.
lastRef = ref.clone();
}
}
writeBytes(ew, ref, fieldType);
return true;
}
protected void writeBytes(MapWriter.EntryWriter ew, BytesRef ref, FieldType fieldType) throws IOException {
if (ew instanceof JavaBinCodec.BinEntryWriter) {
ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
} else {
String v = null;
if (sortValue != null) {
v = ((StringValue) sortValue).getLastString();
if (v == null) {
fieldType.indexedToReadable(ref, cref);
v = cref.toString();
((StringValue) sortValue).setLastString(v);
ew.put(this.field, cref.toString());
}
} else {
fieldType.indexedToReadable(ref, cref);
v = cref.toString();
}
ew.put(this.field, v);
}
return true;
}
}

View File

@ -38,13 +38,14 @@ class StringValue implements SortValue {
protected LongValues toGlobal = LongValues.IDENTITY; // this segment to global ordinal. NN;
protected SortedDocValues docValues;
protected int currentOrd;
public int currentOrd;
protected int lastDocID;
private boolean present;
private BytesRef lastBytes;
private String lastString;
private int lastOrd = -1;
private int leafOrd = -1;
public StringValue(SortedDocValues globalDocValues, String field, IntComp comp) {
this.globalDocValues = globalDocValues;
@ -74,18 +75,22 @@ class StringValue implements SortValue {
}
public void setCurrentValue(int docId) throws IOException {
//System.out.println(docId +":"+lastDocID);
/*
if (docId < lastDocID) {
throw new AssertionError("docs were sent out-of-order: lastDocID=" + lastDocID + " vs doc=" + docId);
}
lastDocID = docId;
*/
if (docId > docValues.docID()) {
docValues.advance(docId);
}
if (docId == docValues.docID()) {
present = true;
currentOrd = (int) toGlobal.get(docValues.ordValue());
currentOrd = docValues.ordValue();
} else {
present = false;
currentOrd = -1;
@ -98,9 +103,12 @@ class StringValue implements SortValue {
}
public void setCurrentValue(SortValue sv) {
StringValue v = (StringValue)sv;
StringValue v = (StringValue) sv;
this.currentOrd = v.currentOrd;
this.present = v.present;
this.leafOrd = v.leafOrd;
this.lastOrd = v.lastOrd;
this.toGlobal = v.toGlobal;
}
public Object getCurrentValue() throws IOException {
@ -113,11 +121,27 @@ class StringValue implements SortValue {
return lastBytes;
}
public void toGlobalValue(SortValue previousValue) {
lastOrd = currentOrd;
StringValue sv = (StringValue) previousValue;
if (sv.lastOrd == currentOrd) {
//Take the global ord from the previousValue unless we are a -1 which is the same in both global and leaf ordinal
if(this.currentOrd != -1) {
this.currentOrd = sv.currentOrd;
}
} else {
if(this.currentOrd > -1) {
this.currentOrd = (int) toGlobal.get(this.currentOrd);
}
}
}
public String getField() {
return field;
}
public void setNextReader(LeafReaderContext context) throws IOException {
leafOrd = context.ord;
if (ordinalMap != null) {
toGlobal = ordinalMap.getGlobalOrds(context.ord);
}
@ -128,6 +152,7 @@ class StringValue implements SortValue {
public void reset() {
this.currentOrd = comp.resetValue();
this.present = false;
lastDocID = 0;
}
public int compareTo(SortValue o) {

View File

@ -25,6 +25,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
protected SortValue value3;
@Override
public SortValue getSortValue(String field) {
if (value1.getField().equals(field)) {
return value1;
@ -36,6 +37,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
return null;
}
@Override
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
@ -44,6 +46,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
value3.setNextReader(context);
}
@Override
public void reset() {
this.docId = -1;
this.docBase = -1;
@ -53,6 +56,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
value3.reset();
}
@Override
public void setValues(int docId) throws IOException {
this.docId = docId;
value1.setCurrentValue(docId);
@ -60,13 +64,22 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
value3.setCurrentValue(docId);
}
@Override
public void setGlobalValues(SortDoc previous) {
TripleValueSortDoc tripleValueSortDoc = (TripleValueSortDoc) previous;
value1.toGlobalValue(tripleValueSortDoc.value1);
value2.toGlobalValue(tripleValueSortDoc.value2);
value3.toGlobalValue(tripleValueSortDoc.value3);
}
@Override
public void setValues(SortDoc sortDoc) {
this.docId = sortDoc.docId;
this.ord = sortDoc.ord;
this.docBase = sortDoc.docBase;
value1.setCurrentValue(((TripleValueSortDoc)sortDoc).value1);
value2.setCurrentValue(((TripleValueSortDoc)sortDoc).value2);
value3.setCurrentValue(((TripleValueSortDoc)sortDoc).value3);
value1.setCurrentValue(((TripleValueSortDoc) sortDoc).value1);
value2.setCurrentValue(((TripleValueSortDoc) sortDoc).value2);
value3.setCurrentValue(((TripleValueSortDoc) sortDoc).value3);
}
public TripleValueSortDoc(SortValue value1, SortValue value2, SortValue value3) {
@ -74,44 +87,51 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
this.value3 = value3;
}
@Override
public SortDoc copy() {
return new TripleValueSortDoc(value1.copy(), value2.copy(), value3.copy());
}
@Override
public boolean lessThan(Object o) {
TripleValueSortDoc sd = (TripleValueSortDoc)o;
TripleValueSortDoc sd = (TripleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value2.compareTo(sd.value2);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
comp = value3.compareTo(sd.value3);
if(comp == -1) {
if (comp == -1) {
return true;
} else if (comp == 1) {
return false;
} else {
return docId+docBase > sd.docId+sd.docBase;
return docId + docBase > sd.docId + sd.docBase;
}
}
}
}
public int compareTo(Object o) {
TripleValueSortDoc sd = (TripleValueSortDoc)o;
@Override
public int compareTo(SortDoc o) {
TripleValueSortDoc sd = (TripleValueSortDoc) o;
int comp = value1.compareTo(sd.value1);
if (comp == 0) {
comp = value2.compareTo(sd.value2);
if (comp == 0) {
return value3.compareTo(sd.value3);
comp = value3.compareTo(sd.value3);
if (comp == 0) {
return (sd.docId + sd.docBase) - (docId + docBase);
} else {
return comp;
}
} else {
return comp;
}

View File

@ -503,55 +503,60 @@ public class SolrLogPostTool {
private void addParams(SolrInputDocument doc, String params) {
String[] pairs = params.split("&");
for(String pair : pairs) {
for (String pair : pairs) {
String[] parts = pair.split("=");
if(parts.length == 2 && parts[0].equals("q")) {
if (parts.length == 2 && parts[0].equals("q")) {
String dq = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "q_s", dq);
setFieldIfUnset(doc, "q_t", dq);
}
if(parts[0].equals("rows")) {
if (parts[0].equals("rows")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "rows_i", dr);
}
if(parts[0].equals("distrib")) {
if (parts[0].equals("start")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "start_i", dr);
}
if (parts[0].equals("distrib")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "distrib_s", dr);
}
if(parts[0].equals("shards")) {
if (parts[0].equals("shards")) {
setFieldIfUnset(doc, "shards_s", "true");
}
if(parts[0].equals("ids") && !isRTGRequest(doc)) {
if (parts[0].equals("ids") && !isRTGRequest(doc)) {
setFieldIfUnset(doc, "ids_s", "true");
}
if(parts[0].equals("isShard")) {
if (parts[0].equals("isShard")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "isShard_s", dr);
}
if(parts[0].equals("wt")) {
if (parts[0].equals("wt")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "wt_s", dr);
}
if(parts[0].equals("facet")) {
if (parts[0].equals("facet")) {
String dr = URLDecoder.decode(parts[1], Charset.defaultCharset());
setFieldIfUnset(doc, "facet_s", dr);
}
if(parts[0].equals("shards.purpose")) {
if (parts[0].equals("shards.purpose")) {
try {
int purpose = Integer.parseInt(parts[1]);
String[] purposes = getRequestPurposeNames(purpose);
for (String p : purposes) {
doc.addField("purpose_ss", p);
}
} catch(Throwable e) {
} catch (Throwable e) {
//We'll just sit on this for now and not interrupt the load for this one field.
}
}