SOLR-14537: Improve performance of ExportWriter.

This commit is contained in:
Andrzej Bialecki 2020-07-02 14:20:26 +02:00
parent 257a185107
commit 1b8fb70216
18 changed files with 903 additions and 498 deletions

View File

@ -102,6 +102,8 @@ Improvements
* SOLR-14523: Enhance gradle logging calls validation: eliminate getMessage() (Andras Salamon via Erick Erickson)
* SOLR-14537: Improve performance of ExportWriter. (ab, Joel Bernstein)
Optimizations
---------------------

View File

@ -34,6 +34,9 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.handler.export.ExportWriter;
import org.apache.solr.handler.export.ExportWriterStream;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.slf4j.Logger;
@ -51,6 +54,7 @@ public class ExportHandler extends SearchHandler {
private String coreName;
private SolrClientCache solrClientCache;
private StreamContext initialStreamContext;
private String writerMetricsPath;
public static class ExportHandlerStreamFactory extends SolrDefaultStreamFactory {
static final String[] forbiddenStreams = new String[] {
@ -67,10 +71,16 @@ public class ExportHandler extends SearchHandler {
for (String function : forbiddenStreams) {
this.withoutFunctionName(function);
}
this.withFunctionName("input", ExportWriter.ExportWriterStream.class);
this.withFunctionName("input", ExportWriterStream.class);
}
}
@Override
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
super.initializeMetrics(parentContext, scope);
this.writerMetricsPath = SolrMetricManager.mkName("writer", getCategory().toString(), scope);
}
@Override
public void inform(SolrCore core) {
super.inform(core);
@ -112,6 +122,6 @@ public class ExportHandler extends SearchHandler {
Map<String, String> map = new HashMap<>(1);
map.put(CommonParams.WT, ReplicationHandler.FILE_STREAM);
req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map),req.getParams()));
rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext));
rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext, solrMetricsContext, writerMetricsPath));
}
}

View File

@ -44,6 +44,7 @@ class DoubleValueSortDoc extends SingleValueSortDoc {
public void reset() {
this.docId = -1;
this.docBase = -1;
this.ord = -1;
value1.reset();
value2.reset();
}

View File

@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.export;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import com.codahale.metrics.Timer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Sort;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class implementing a "double buffering" producer / consumer.
*/
class ExportBuffers {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final long EXCHANGE_TIMEOUT_SECONDS = 600;
static final String EXPORT_BUFFERS_KEY = "__eb__";
final Buffer bufferOne;
final Buffer bufferTwo;
final List<LeafReaderContext> leaves;
final ExportWriter exportWriter;
final OutputStream os;
final Timer writeOutputBufferTimer;
final Timer fillerWaitTimer;
final Timer writerWaitTimer;
final IteratorWriter.ItemWriter rawWriter;
final IteratorWriter.ItemWriter writer;
final CyclicBarrier barrier;
final int totalHits;
Buffer fillBuffer;
Buffer outputBuffer;
Runnable filler;
ExecutorService service;
Throwable error;
LongAdder outputCounter = new LongAdder();
volatile boolean shutDown = false;
ExportBuffers(ExportWriter exportWriter, List<LeafReaderContext> leaves, SolrIndexSearcher searcher,
OutputStream os, IteratorWriter.ItemWriter rawWriter, Sort sort, int queueSize, int totalHits,
Timer writeOutputBufferTimer, Timer fillerWaitTimer, Timer writerWaitTimer) throws IOException {
this.exportWriter = exportWriter;
this.leaves = leaves;
this.os = os;
this.rawWriter = rawWriter;
this.writer = new IteratorWriter.ItemWriter() {
@Override
public IteratorWriter.ItemWriter add(Object o) throws IOException {
rawWriter.add(o);
outputCounter.increment();
return this;
}
};
this.writeOutputBufferTimer = writeOutputBufferTimer;
this.fillerWaitTimer = fillerWaitTimer;
this.writerWaitTimer = writerWaitTimer;
this.bufferOne = new Buffer(queueSize);
this.bufferTwo = new Buffer(queueSize);
this.totalHits = totalHits;
fillBuffer = bufferOne;
outputBuffer = bufferTwo;
SortDoc writerSortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
bufferOne.initialize(writerSortDoc);
bufferTwo.initialize(writerSortDoc);
barrier = new CyclicBarrier(2, () -> swapBuffers());
filler = () -> {
try {
log.debug("--- filler start {}", Thread.currentThread());
SortDoc sortDoc = exportWriter.getSortDoc(searcher, sort.getSort());
Buffer buffer = getFillBuffer();
SortQueue queue = new SortQueue(queueSize, sortDoc);
long lastOutputCounter = 0;
for (int count = 0; count < totalHits; ) {
log.debug("--- filler fillOutDocs in {}", fillBuffer);
exportWriter.fillOutDocs(leaves, sortDoc, queue, buffer);
count += (buffer.outDocsIndex + 1);
log.debug("--- filler count={}, exchange buffer from {}", count, buffer);
Timer.Context timerContext = getFillerWaitTimer().time();
try {
exchangeBuffers();
} finally {
timerContext.stop();
}
buffer = getFillBuffer();
if (outputCounter.longValue() > lastOutputCounter) {
lastOutputCounter = outputCounter.longValue();
flushOutput();
}
log.debug("--- filler got empty buffer {}", buffer);
}
buffer.outDocsIndex = Buffer.NO_MORE_DOCS;
log.debug("--- filler final exchange buffer from {}", buffer);
Timer.Context timerContext = getFillerWaitTimer().time();
try {
exchangeBuffers();
} finally {
timerContext.stop();
}
buffer = getFillBuffer();
log.debug("--- filler final got buffer {}", buffer);
} catch (Throwable e) {
log.error("filler", e);
error(e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
shutdownNow();
}
};
}
public void exchangeBuffers() throws Exception {
log.debug("---- wait exchangeBuffers from {}", Thread.currentThread());
barrier.await(EXCHANGE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
public void error(Throwable t) {
error = t;
// break the lock on the other thread too
barrier.reset();
}
public Throwable getError() {
return error;
}
private void swapBuffers() {
log.debug("--- swap buffers");
Buffer one = fillBuffer;
fillBuffer = outputBuffer;
outputBuffer = one;
}
private void flushOutput() throws IOException {
//os.flush();
}
// initial output buffer
public Buffer getOutputBuffer() {
return outputBuffer;
}
public Buffer getFillBuffer() {
return fillBuffer;
}
public Timer getWriteOutputBufferTimer() {
return writeOutputBufferTimer;
}
public Timer getFillerWaitTimer() {
return fillerWaitTimer;
}
public Timer getWriterWaitTimer() {
return writerWaitTimer;
}
// decorated writer that keeps track of number of writes
public IteratorWriter.ItemWriter getWriter() {
return writer;
}
public void shutdownNow() {
if (service != null) {
log.debug("--- shutting down buffers");
service.shutdownNow();
service = null;
}
shutDown = true;
}
public boolean isShutDown() {
return shutDown;
}
/**
* Start processing and block until complete or Exception is thrown.
*
* @param writer writer that exchanges and processes buffers received from a producer.
* @throws IOException on errors
*/
public void run(Callable<Boolean> writer) throws IOException {
service = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("ExportBuffers"));
try {
CompletableFuture.runAsync(filler, service);
writer.call();
// alternatively we could run the writer in a separate thread:
// CompletableFuture<Void> allDone = CompletableFuture.allOf(
// CompletableFuture.runAsync(filler, service),
// CompletableFuture.runAsync(() -> {
// try {
// writer.call();
// } catch (Exception e) {
// log.error("writer", e);
// shutdownNow();
// }
// }, service)
// );
// allDone.join();
log.debug("-- finished.");
} catch (Exception e) {
log.error("Exception running filler / writer", e);
error(e);
//
} finally {
log.debug("--- all done, shutting down buffers");
shutdownNow();
}
}
public static final class Buffer {
static final int EMPTY = -1;
static final int NO_MORE_DOCS = -2;
int outDocsIndex = EMPTY;
SortDoc[] outDocs;
public Buffer(int size) {
outDocs = new SortDoc[size];
}
public void initialize(SortDoc proto) {
outDocsIndex = EMPTY;
for (int i = 0; i < outDocs.length; i++) {
outDocs[i] = proto.copy();
}
}
@Override
public String toString() {
return "Buffer@" + Integer.toHexString(hashCode()) + "{" +
"outDocsIndex=" + outDocsIndex +
'}';
}
}
}

View File

@ -26,6 +26,7 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.List;
import com.codahale.metrics.Timer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
@ -36,18 +37,10 @@ import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.IteratorWriter;
@ -60,6 +53,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.StreamParams;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.BinaryResponseWriter;
@ -93,29 +87,26 @@ import static org.apache.solr.common.util.Utils.makeMap;
* {@link ExportWriter} gathers and sorts the documents for a core using "stream sorting".
* <p>
* Stream sorting works by repeatedly processing and modifying a bitmap of matching documents. Each pass over the
* bitmap identifies the smallest {@link #DOCUMENT_BATCH_SIZE} docs that haven't been sent yet and stores them in a
* bitmap identifies the smallest docs (default is {@link #DEFAULT_BATCH_SIZE}) that haven't been sent yet and stores them in a
* Priority Queue. They are then exported (written across the wire) and marked as sent (unset in the bitmap).
* This process repeats until all matching documents have been sent.
* <p>
* This streaming approach is light on memory (only {@link #DOCUMENT_BATCH_SIZE} documents are ever stored in memory at
* This streaming approach is light on memory (only up to 2x batch size documents are ever stored in memory at
* once), and it allows {@link ExportWriter} to scale well with regard to numDocs.
*/
public class ExportWriter implements SolrCore.RawWriter, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int DOCUMENT_BATCH_SIZE = 30000;
private static final String EXPORT_WRITER_KEY = "__ew__";
private static final String SORT_DOCS_KEY = "_ew_docs_";
private static final String TOTAL_HITS_KEY = "_ew_totalHits_";
private static final String LEAF_READERS_KEY = "_ew_leaves_";
private static final String SORT_QUEUE_KEY = "_ew_queue_";
private static final String SORT_DOC_KEY = "_ew_sort_";
public static final String BATCH_SIZE_PARAM = "batchSize";
public static final int DEFAULT_BATCH_SIZE = 30000;
private OutputStreamWriter respWriter;
final SolrQueryRequest req;
final SolrQueryResponse res;
final StreamContext initialStreamContext;
final SolrMetricsContext solrMetricsContext;
final String metricsPath;
final int batchSize;
StreamExpression streamExpression;
StreamContext streamContext;
FieldWriter[] fieldWriters;
@ -123,143 +114,28 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
FixedBitSet[] sets = null;
PushWriter writer;
private String wt;
private static class TupleEntryWriter implements EntryWriter {
Tuple tuple;
void setTuple(Tuple tuple) {
this.tuple = tuple;
}
@Override
public EntryWriter put(CharSequence k, Object v) throws IOException {
tuple.put(k, v);
return this;
}
}
public static class ExportWriterStream extends TupleStream implements Expressible {
StreamContext context;
StreamComparator streamComparator;
int pos = -1;
int outDocIndex = -1;
int count;
SortDoc sortDoc;
SortQueue queue;
SortDoc[] docs;
int totalHits;
ExportWriter exportWriter;
List<LeafReaderContext> leaves;
final TupleEntryWriter entryWriter = new TupleEntryWriter();
public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
streamComparator = parseComp(factory.getDefaultSort());
}
@Override
public void setStreamContext(StreamContext context) {
this.context = context;
}
@Override
public List<TupleStream> children() {
return null;
}
private StreamComparator parseComp(String sort) throws IOException {
String[] sorts = sort.split(",");
StreamComparator[] comps = new StreamComparator[sorts.length];
for(int i=0; i<sorts.length; i++) {
String s = sorts[i];
String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
if (spec.length != 2) {
throw new IOException("Invalid sort spec:" + s);
}
String fieldName = spec[0].trim();
String order = spec[1].trim();
comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
}
if(comps.length > 1) {
return new MultipleFieldComparator(comps);
} else {
return comps[0];
}
}
@Override
@SuppressWarnings({"unchecked"})
public void open() throws IOException {
docs = (SortDoc[]) context.get(SORT_DOCS_KEY);
queue = (SortQueue) context.get(SORT_QUEUE_KEY);
sortDoc = (SortDoc) context.get(SORT_DOC_KEY);
totalHits = (Integer) context.get(TOTAL_HITS_KEY);
exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY);
leaves = (List<LeafReaderContext>) context.get(LEAF_READERS_KEY);
count = 0;
}
@Override
public void close() throws IOException {
exportWriter = null;
leaves = null;
}
@Override
public Tuple read() throws IOException {
if (pos < 0) {
if (count < totalHits) {
outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs);
count += (outDocIndex + 1);
pos = outDocIndex;
} else {
return Tuple.EOF();
}
}
if (pos < 0) {
return Tuple.EOF();
}
Tuple tuple = new Tuple();
entryWriter.setTuple(tuple);
SortDoc s = docs[pos];
exportWriter.writeDoc(s, leaves, entryWriter);
s.reset();
pos--;
return tuple;
}
@Override
public StreamComparator getStreamSort() {
return streamComparator;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("input")
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
.withExpression("--non-expressible--");
}
}
final Timer identifyLowestSortingDocTimer;
final Timer transferBatchToBufferTimer;
final Timer writeOutputBufferTimer;
final Timer writerWaitTimer;
final Timer fillerWaitTimer;
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) {
public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt,
StreamContext initialStreamContext, SolrMetricsContext solrMetricsContext,
String metricsPath) {
this.req = req;
this.res = res;
this.wt = wt;
this.initialStreamContext = initialStreamContext;
this.solrMetricsContext = solrMetricsContext;
this.metricsPath = metricsPath;
this.batchSize = req.getParams().getInt(BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
identifyLowestSortingDocTimer = solrMetricsContext.timer("identifyLowestSortingDoc", metricsPath);
transferBatchToBufferTimer = solrMetricsContext.timer("transferBatchToBuffer", metricsPath);
writeOutputBufferTimer = solrMetricsContext.timer("writeOutputBuffer", metricsPath);
writerWaitTimer = solrMetricsContext.timer("writerWaitTimer", metricsPath);
fillerWaitTimer = solrMetricsContext.timer("fillerWaitTimer", metricsPath);
}
@Override
@ -409,7 +285,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
m.put("responseHeader", singletonMap("status", 0));
m.put("response", (MapWriter) mw -> {
mw.put("numFound", totalHits);
mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
mw.put("docs", (IteratorWriter) iw -> writeDocs(req, os, iw, sort));
});
});
if (streamContext != null) {
@ -427,6 +303,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
Timer.Context timerContext = identifyLowestSortingDocTimer.time();
try {
queue.reset();
SortDoc top = queue.top();
for (int i = 0; i < leaves.size(); i++) {
@ -441,107 +319,142 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
}
}
} finally {
timerContext.stop();
}
}
protected int transferBatchToArrayForOutput(SortQueue queue, SortDoc[] destinationArr) {
protected void transferBatchToBufferForOutput(SortQueue queue,
List<LeafReaderContext> leaves,
ExportBuffers.Buffer destination) throws IOException {
Timer.Context timerContext = transferBatchToBufferTimer.time();
try {
int outDocsIndex = -1;
for (int i = 0; i < queue.maxSize; i++) {
SortDoc s = queue.pop();
if (s.docId > -1) {
destinationArr[++outDocsIndex] = s;
destination.outDocs[++outDocsIndex].setValues(s);
// remove this doc id from the matching bitset, it's been exported
sets[s.ord].clear(s.docId);
s.reset(); // reuse
}
}
destination.outDocsIndex = outDocsIndex;
} catch (Throwable t) {
log.error("transfer", t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw t;
} finally {
timerContext.stop();
}
}
return outDocsIndex;
}
protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
try {
for (int i = outDocsIndex; i >= 0; --i) {
SortDoc s = docsToExport[i];
writer.add((MapWriter) ew -> {
writeDoc(s, leaves, ew);
s.reset();
});
}
} catch (Throwable e) {
Throwable ex = e;
while (ex != null) {
String m = ex.getMessage();
if (m != null && m.contains("Broken pipe")) {
throw new IgnoreException();
}
ex = ex.getCause();
}
if (e instanceof IOException) {
throw ((IOException) e);
} else {
throw new IOException(e);
}
}
}
protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
protected void writeDocs(SolrQueryRequest req, OutputStream os, IteratorWriter.ItemWriter writer, Sort sort) throws IOException {
List<LeafReaderContext> leaves = req.getSearcher().getTopReaderContext().leaves();
SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort());
final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits);
final int queueSize = Math.min(batchSize, totalHits);
SortQueue queue = new SortQueue(queueSize, sortDoc);
SortDoc[] outDocs = new SortDoc[queueSize];
ExportBuffers buffers = new ExportBuffers(this, leaves, req.getSearcher(), os, writer, sort, queueSize, totalHits,
writeOutputBufferTimer, fillerWaitTimer, writerWaitTimer);
if (streamExpression != null) {
streamContext.put(SORT_DOCS_KEY, outDocs);
streamContext.put(SORT_QUEUE_KEY, queue);
streamContext.put(SORT_DOC_KEY, sortDoc);
streamContext.put(TOTAL_HITS_KEY, totalHits);
streamContext.put(EXPORT_WRITER_KEY, this);
streamContext.put(LEAF_READERS_KEY, leaves);
TupleStream tupleStream = createTupleStream();
streamContext.put(ExportBuffers.EXPORT_BUFFERS_KEY, buffers);
final TupleStream tupleStream;
try {
tupleStream = createTupleStream();
tupleStream.open();
} catch (Exception e) {
buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
return;
}
buffers.run(() -> {
for (;;) {
final Tuple t = tupleStream.read();
if (Thread.currentThread().isInterrupted()) {
break;
}
final Tuple t;
try {
t = tupleStream.read();
} catch (final Exception e) {
buffers.getWriter().add((MapWriter) ew -> Tuple.EXCEPTION(e, true).writeMap(ew));
break;
}
if (t == null) {
break;
}
if (t.EOF) {
if (t.EOF && !t.EXCEPTION) {
break;
}
writer.add((MapWriter) ew -> t.writeMap(ew));
// use decorated writer to monitor the number of output writes
// and flush the output quickly in case of very few (reduced) output items
buffers.getWriter().add((MapWriter) ew -> t.writeMap(ew));
if (t.EXCEPTION && t.EOF) {
break;
}
}
return true;
});
tupleStream.close();
} else {
for (int count = 0; count < totalHits; ) {
int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs);
count += (outDocsIndex + 1);
addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex);
buffers.run(() -> {
// get the initial buffer
log.debug("--- writer init exchanging from empty");
buffers.exchangeBuffers();
ExportBuffers.Buffer buffer = buffers.getOutputBuffer();
log.debug("--- writer init got {}", buffer);
while (buffer.outDocsIndex != ExportBuffers.Buffer.NO_MORE_DOCS) {
if (Thread.currentThread().isInterrupted()) {
log.debug("--- writer interrupted");
break;
}
Timer.Context timerContext = writeOutputBufferTimer.time();
try {
for (int i = buffer.outDocsIndex; i >= 0; --i) {
// we're using the raw writer here because there's no potential
// reduction in the number of output items, unlike when using
// streaming expressions
final SortDoc currentDoc = buffer.outDocs[i];
writer.add((MapWriter) ew -> writeDoc(currentDoc, leaves, ew, fieldWriters));
}
} finally {
timerContext.stop();
}
log.debug("--- writer exchanging from {}", buffer);
timerContext = writerWaitTimer.time();
try {
buffers.exchangeBuffers();
} finally {
timerContext.stop();
}
buffer = buffers.getOutputBuffer();
log.debug("--- writer got {}", buffer);
}
return true;
});
}
}
private int fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
SortQueue sortQueue, SortDoc[] outDocs) throws IOException {
void fillOutDocs(List<LeafReaderContext> leaves, SortDoc sortDoc,
SortQueue sortQueue, ExportBuffers.Buffer buffer) throws IOException {
identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue);
return transferBatchToArrayForOutput(sortQueue, outDocs);
transferBatchToBufferForOutput(sortQueue, leaves, buffer);
}
void writeDoc(SortDoc sortDoc,
List<LeafReaderContext> leaves,
EntryWriter ew) throws IOException {
EntryWriter ew, FieldWriter[] writers) throws IOException {
int ord = sortDoc.ord;
FixedBitSet set = sets[ord];
set.clear(sortDoc.docId);
LeafReaderContext context = leaves.get(ord);
int fieldIndex = 0;
for (FieldWriter fieldWriter : fieldWriters) {
for (FieldWriter fieldWriter : writers) {
if (fieldWriter.write(sortDoc, context.reader(), ew, fieldIndex)) {
++fieldIndex;
}
}
}
protected FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
public FieldWriter[] getFieldWriters(String[] fields, SolrIndexSearcher searcher) throws IOException {
IndexSchema schema = searcher.getSchema();
FieldWriter[] writers = new FieldWriter[fields.length];
for (int i = 0; i < fields.length; i++) {
@ -613,7 +526,7 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
return writers;
}
private SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
SortDoc getSortDoc(SolrIndexSearcher searcher, SortField[] sortFields) throws IOException {
SortValue[] sortValues = new SortValue[sortFields.length];
IndexSchema schema = searcher.getSchema();
for (int i = 0; i < sortFields.length; ++i) {

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.export;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stream implementation that helps supporting 'expr' streaming in export writer.
* <p>Note: this class is made public only to allow access from {@link org.apache.solr.handler.ExportHandler},
* it should be treated as an internal detail of implementation.</p>
* @lucene.experimental
*/
public class ExportWriterStream extends TupleStream implements Expressible {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final TupleEntryWriter tupleEntryWriter = new TupleEntryWriter();
StreamContext context;
StreamComparator streamComparator;
int pos = -1;
ExportBuffers exportBuffers;
ExportBuffers.Buffer buffer;
Timer.Context writeOutputTimerContext;
private static final class TupleEntryWriter implements EntryWriter {
Tuple tuple;
@Override
public EntryWriter put(CharSequence k, Object v) throws IOException {
if (v instanceof IteratorWriter) {
List lst = new ArrayList();
((IteratorWriter)v).toList(lst);
v = lst;
} else if (v instanceof MapWriter) {
Map<String, Object> map = new HashMap<>();
((MapWriter)v).toMap(map);
v = map;
}
tuple.put(k.toString(), v);
return this;
}
}
public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException {
streamComparator = parseComp(factory.getDefaultSort());
}
/**
* NOTE: this context must contain an instance of {@link ExportBuffers} under the
* {@link ExportBuffers#EXPORT_BUFFERS_KEY} key.
*/
@Override
public void setStreamContext(StreamContext context) {
this.context = context;
}
@Override
public List<TupleStream> children() {
return null;
}
private StreamComparator parseComp(String sort) throws IOException {
String[] sorts = sort.split(",");
StreamComparator[] comps = new StreamComparator[sorts.length];
for (int i = 0; i < sorts.length; i++) {
String s = sorts[i];
String[] spec = s.trim().split("\\s+"); //This should take into account spaces in the sort spec.
if (spec.length != 2) {
throw new IOException("Invalid sort spec:" + s);
}
String fieldName = spec[0].trim();
String order = spec[1].trim();
comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING);
}
if (comps.length > 1) {
return new MultipleFieldComparator(comps);
} else {
return comps[0];
}
}
@Override
public void open() throws IOException {
exportBuffers = (ExportBuffers) context.get(ExportBuffers.EXPORT_BUFFERS_KEY);
buffer = exportBuffers.getOutputBuffer();
}
@Override
public void close() throws IOException {
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
}
exportBuffers = null;
}
@Override
public Tuple read() throws IOException {
Tuple res = null;
if (pos < 0) {
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
writeOutputTimerContext = null;
}
try {
buffer.outDocsIndex = ExportBuffers.Buffer.EMPTY;
log.debug("--- ews exchange empty buffer {}", buffer);
boolean exchanged = false;
while (!exchanged) {
Timer.Context timerContext = exportBuffers.getWriterWaitTimer().time();
try {
exportBuffers.exchangeBuffers();
exchanged = true;
} catch (TimeoutException e) {
log.debug("--- ews timeout loop");
if (exportBuffers.isShutDown()) {
log.debug("--- ews - the other end is shutdown, returning EOF");
res = Tuple.EOF();
break;
}
continue;
} catch (InterruptedException e) {
log.debug("--- ews interrupted");
exportBuffers.error(e);
res = Tuple.EXCEPTION(e, true);
break;
} catch (BrokenBarrierException e) {
if (exportBuffers.getError() != null) {
res = Tuple.EXCEPTION(exportBuffers.getError(), true);
} else {
res = Tuple.EXCEPTION(e, true);
}
break;
} finally {
timerContext.stop();
}
}
} catch (InterruptedException e) {
log.debug("--- ews interrupt");
exportBuffers.error(e);
res = Tuple.EXCEPTION(e, true);
} catch (Exception e) {
log.debug("--- ews exception", e);
exportBuffers.error(e);
res = Tuple.EXCEPTION(e, true);
}
buffer = exportBuffers.getOutputBuffer();
if (buffer == null) {
res = Tuple.EOF();
}
if (buffer.outDocsIndex == ExportBuffers.Buffer.NO_MORE_DOCS) {
log.debug("--- ews EOF");
res = Tuple.EOF();
} else {
pos = buffer.outDocsIndex;
log.debug("--- ews new pos=" + pos);
}
}
if (pos < 0) {
log.debug("--- ews EOF?");
res = Tuple.EOF();
}
if (res != null) {
// only errors or EOF assigned result so far
if (writeOutputTimerContext != null) {
writeOutputTimerContext.stop();
}
return res;
}
if (writeOutputTimerContext == null) {
writeOutputTimerContext = exportBuffers.getWriteOutputBufferTimer().time();
}
SortDoc sortDoc = buffer.outDocs[pos];
tupleEntryWriter.tuple = new Tuple();
exportBuffers.exportWriter.writeDoc(sortDoc, exportBuffers.leaves, tupleEntryWriter, exportBuffers.exportWriter.fieldWriters);
pos--;
return tupleEntryWriter.tuple;
}
@Override
public StreamComparator getStreamSort() {
return streamComparator;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("input")
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
.withExpression("--non-expressible--");
}
}

View File

@ -25,10 +25,10 @@ import org.apache.lucene.index.NumericDocValues;
public class LongValue implements SortValue {
final protected String field;
final protected LongComp comp;
protected NumericDocValues vals;
protected String field;
protected long currentValue;
protected LongComp comp;
private int lastDocID;
private boolean present;

View File

@ -1,218 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.export;
import org.apache.lucene.util.ArrayUtil;
public abstract class PriorityQueue<T> {
protected int size = 0;
protected final int maxSize;
private final T[] heap;
public PriorityQueue(int maxSize) {
this(maxSize, true);
}
public PriorityQueue(int maxSize, boolean prepopulate) {
final int heapSize;
if (0 == maxSize) {
// We allocate 1 extra to avoid if statement in top()
heapSize = 2;
} else {
if (maxSize > ArrayUtil.MAX_ARRAY_LENGTH) {
// Don't wrap heapSize to -1, in this case, which
// causes a confusing NegativeArraySizeException.
// Note that very likely this will simply then hit
// an OOME, but at least that's more indicative to
// caller that this values is too big. We don't +1
// in this case, but it's very unlikely in practice
// one will actually insert this many objects into
// the PQ:
// Throw exception to prevent confusing OOME:
throw new IllegalArgumentException("maxSize must be <= " + ArrayUtil.MAX_ARRAY_LENGTH + "; got: " + maxSize);
} else {
// NOTE: we add +1 because all access to heap is
// 1-based not 0-based. heap[0] is unused.
heapSize = maxSize + 1;
}
}
// T is unbounded type, so this unchecked cast works always:
@SuppressWarnings("unchecked") final T[] h = (T[]) new Object[heapSize];
this.heap = h;
this.maxSize = maxSize;
if (prepopulate) {
// If sentinel objects are supported, populate the queue with them
T sentinel = getSentinelObject();
if (sentinel != null) {
heap[1] = sentinel;
for (int i = 2; i < heap.length; i++) {
heap[i] = getSentinelObject();
}
size = maxSize;
}
}
}
/** Determines the ordering of objects in this priority queue. Subclasses
* must define this one method.
* @return <code>true</code> iff parameter <code>a</code> is less than parameter <code>b</code>.
*/
protected abstract boolean lessThan(T a, T b);
protected T getSentinelObject() {
return null;
}
/**
* Adds an Object to a PriorityQueue in log(size) time. If one tries to add
* more objects than maxSize from initialize an
*
* @return the new 'top' element in the queue.
*/
public final T add(T element) {
size++;
heap[size] = element;
upHeap();
return heap[1];
}
/**
* Adds an Object to a PriorityQueue in log(size) time.
* It returns the object (if any) that was
* dropped off the heap because it was full. This can be
* the given parameter (in case it is smaller than the
* full heap's minimum, and couldn't be added), or another
* object that was previously the smallest value in the
* heap and now has been replaced by a larger one, or null
* if the queue wasn't yet full with maxSize elements.
*/
public T insertWithOverflow(T element) {
if (size < maxSize) {
add(element);
return null;
} else if (size > 0 && !lessThan(element, heap[1])) {
T ret = heap[1];
heap[1] = element;
updateTop();
return ret;
} else {
return element;
}
}
/** Returns the least element of the PriorityQueue in constant time. */
public final T top() {
// We don't need to check size here: if maxSize is 0,
// then heap is length 2 array with both entries null.
// If size is 0 then heap[1] is already null.
return heap[1];
}
/** Removes and returns the least element of the PriorityQueue in log(size)
time. */
public final T pop() {
if (size > 0) {
T result = heap[1]; // save first value
heap[1] = heap[size]; // move last to first
heap[size] = null; // permit GC of objects
size--;
downHeap(); // adjust heap
return result;
} else {
return null;
}
}
/**
* Should be called when the Object at top changes values. Still log(n) worst
* case, but it's at least twice as fast to
*
* <pre class="prettyprint">
* pq.top().change();
* pq.updateTop();
* </pre>
*
* instead of
*
* <pre class="prettyprint">
* o = pq.pop();
* o.change();
* pq.push(o);
* </pre>
*
* @return the new 'top' element.
*/
public final T updateTop() {
downHeap();
return heap[1];
}
/** Returns the number of elements currently stored in the PriorityQueue. */
public final int size() {
return size;
}
/** Removes all entries from the PriorityQueue. */
public final void clear() {
for (int i = 0; i <= size; i++) {
heap[i] = null;
}
size = 0;
}
private final void upHeap() {
int i = size;
T node = heap[i]; // save bottom node
int j = i >>> 1;
while (j > 0 && lessThan(node, heap[j])) {
heap[i] = heap[j]; // shift parents down
i = j;
j = j >>> 1;
}
heap[i] = node; // install saved node
}
private final void downHeap() {
int i = 1;
T node = heap[i]; // save top node
int j = i << 1; // find smaller child
int k = j + 1;
if (k <= size && lessThan(heap[k], heap[j])) {
j = k;
}
while (j <= size && lessThan(heap[j], node)) {
heap[i] = heap[j]; // shift up child
i = j;
j = i << 1;
k = j + 1;
if (k <= size && lessThan(heap[k], heap[j])) {
j = k;
}
}
heap[i] = node; // install saved node
}
/** This method returns the internal heap array as Object[].
* @lucene.internal
*/
public final Object[] getHeapArray() {
return (Object[]) heap;
}
}

View File

@ -50,6 +50,7 @@ class QuadValueSortDoc extends TripleValueSortDoc {
public void reset() {
this.docId = -1;
this.docBase = -1;
this.ord = -1;
value1.reset();
value2.reset();
value3.reset();

View File

@ -32,11 +32,6 @@ class SingleValueSortDoc extends SortDoc {
return null;
}
@Override
public SortValue[] getSortValues() {
return new SortValue[] { value1 };
}
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
@ -46,6 +41,7 @@ class SingleValueSortDoc extends SortDoc {
public void reset() {
this.docId = -1;
this.docBase = -1;
this.ord = -1;
this.value1.reset();
}
@ -88,7 +84,7 @@ class SingleValueSortDoc extends SortDoc {
}
public String toString() {
return docId+":"+value1.toString();
return ord + ":" + docBase + ":" + docId + ":val=" + value1.toString();
}
}

View File

@ -45,10 +45,6 @@ class SortDoc {
return null;
}
public SortValue[] getSortValues() {
return sortValues;
}
public void setNextReader(LeafReaderContext context) throws IOException {
this.ord = context.ord;
this.docBase = context.docBase;
@ -60,6 +56,7 @@ class SortDoc {
public void reset() {
this.docId = -1;
this.docBase = -1;
this.ord = -1;
for (SortValue value : sortValues) {
value.reset();
}
@ -67,7 +64,7 @@ class SortDoc {
public void setValues(int docId) throws IOException {
this.docId = docId;
for(SortValue sortValue : sortValues) {
for (SortValue sortValue : sortValues) {
sortValue.setCurrentValue(docId);
}
}
@ -77,14 +74,14 @@ class SortDoc {
this.ord = sortDoc.ord;
this.docBase = sortDoc.docBase;
SortValue[] vals = sortDoc.sortValues;
for(int i=0; i<vals.length; i++) {
for (int i = 0; i < vals.length; i++) {
sortValues[i].setCurrentValue(vals[i]);
}
}
public SortDoc copy() {
SortValue[] svs = new SortValue[sortValues.length];
for(int i=0; i<sortValues.length; i++) {
for (int i = 0; i < sortValues.length; i++) {
svs[i] = sortValues[i].copy();
}
@ -92,12 +89,12 @@ class SortDoc {
}
public boolean lessThan(Object o) {
if(docId == -1) {
if (docId == -1) {
return true;
}
SortDoc sd = (SortDoc)o;
SortValue[] sortValues1 = sd.sortValues;
for(int i=0; i<sortValues.length; i++) {
for (int i = 0; i < sortValues.length; i++) {
int comp = sortValues[i].compareTo(sortValues1[i]);
if (comp < 0) {
return true;
@ -105,12 +102,12 @@ class SortDoc {
return false;
}
}
return docId+docBase > sd.docId+sd.docBase; //index order
return docId + docBase > sd.docId + sd.docBase; //index order
}
public int compareTo(Object o) {
SortDoc sd = (SortDoc)o;
for (int i=0; i<sortValues.length; i++) {
for (int i = 0; i < sortValues.length; i++) {
int comp = sortValues[i].compareTo(sd.sortValues[i]);
if (comp != 0) {
return comp;
@ -122,8 +119,8 @@ class SortDoc {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("docId: ").append(docId).append("; ");
for (int i=0; i < sortValues.length; i++) {
builder.append(ord).append(':').append(docBase).append(':').append(docId).append("; ");
for (int i = 0; i < sortValues.length; i++) {
builder.append("value").append(i).append(": ").append(sortValues[i]).append(", ");
}
return builder.toString();

View File

@ -17,22 +17,53 @@
package org.apache.solr.handler.export;
class SortQueue extends PriorityQueue<SortDoc> {
import org.apache.lucene.util.ArrayUtil;
private SortDoc proto;
private Object[] cache;
/**
* Specialized class that reuses most of the code from {@link org.apache.lucene.util.PriorityQueue}
* but contains optimizations for the /export handler use.
*/
final class SortQueue {
public SortQueue(int len, SortDoc proto) {
super(len);
protected int size = 0;
protected final int maxSize;
private final SortDoc[] heap;
private final SortDoc proto;
private SortDoc[] cache;
public SortQueue(int maxSize, SortDoc proto) {
this.proto = proto;
final int heapSize;
if (0 == maxSize) {
// We allocate 1 extra to avoid if statement in top()
heapSize = 2;
} else {
if (maxSize > ArrayUtil.MAX_ARRAY_LENGTH) {
// Don't wrap heapSize to -1, in this case, which
// causes a confusing NegativeArraySizeException.
// Note that very likely this will simply then hit
// an OOME, but at least that's more indicative to
// caller that this values is too big. We don't +1
// in this case, but it's very unlikely in practice
// one will actually insert this many objects into
// the PQ:
// Throw exception to prevent confusing OOME:
throw new IllegalArgumentException("maxSize must be <= " + ArrayUtil.MAX_ARRAY_LENGTH + "; got: " + maxSize);
} else {
// NOTE: we add +1 because all access to heap is
// 1-based not 0-based. heap[0] is unused.
heapSize = maxSize + 1;
}
}
this.heap = new SortDoc[heapSize];
this.maxSize = maxSize;
}
protected boolean lessThan(SortDoc t1, SortDoc t2) {
private static final boolean lessThan(SortDoc t1, SortDoc t2) {
return t1.lessThan(t2);
}
protected void populate() {
Object[] heap = getHeapArray();
cache = new SortDoc[heap.length];
for (int i = 1; i < heap.length; i++) {
cache[i] = heap[i] = proto.copy();
@ -41,12 +72,118 @@ class SortQueue extends PriorityQueue<SortDoc> {
}
protected void reset() {
Object[] heap = getHeapArray();
if(cache != null) {
if (cache != null) {
System.arraycopy(cache, 1, heap, 1, heap.length-1);
size = maxSize;
} else {
populate();
}
}
// ==================
/**
* Adds an Object to a PriorityQueue in log(size) time. If one tries to add
* more objects than maxSize from initialize an
*
* @return the new 'top' element in the queue.
*/
public final SortDoc add(SortDoc element) {
size++;
heap[size] = element;
upHeap();
return heap[1];
}
/** Returns the least element of the PriorityQueue in constant time. */
public final SortDoc top() {
// We don't need to check size here: if maxSize is 0,
// then heap is length 2 array with both entries null.
// If size is 0 then heap[1] is already null.
return heap[1];
}
/** Removes and returns the least element of the PriorityQueue in log(size)
time. */
public final SortDoc pop() {
if (size > 0) {
SortDoc result = heap[1]; // save first value
heap[1] = heap[size]; // move last to first
heap[size] = null; // permit GC of objects
size--;
downHeap(); // adjust heap
return result;
} else {
return null;
}
}
/**
* Should be called when the Object at top changes values. Still log(n) worst
* case, but it's at least twice as fast to
*
* <pre class="prettyprint">
* pq.top().change();
* pq.updateTop();
* </pre>
*
* instead of
*
* <pre class="prettyprint">
* o = pq.pop();
* o.change();
* pq.push(o);
* </pre>
*
* @return the new 'top' element.
*/
public final SortDoc updateTop() {
downHeap();
return heap[1];
}
/** Returns the number of elements currently stored in the PriorityQueue. */
public final int size() {
return size;
}
/** Removes all entries from the PriorityQueue. */
public final void clear() {
for (int i = 0; i <= size; i++) {
heap[i] = null;
}
size = 0;
}
private final void upHeap() {
int i = size;
SortDoc node = heap[i]; // save bottom node
int j = i >>> 1;
while (j > 0 && lessThan(node, heap[j])) {
heap[i] = heap[j]; // shift parents down
i = j;
j = j >>> 1;
}
heap[i] = node; // install saved node
}
private final void downHeap() {
int i = 1;
SortDoc node = heap[i]; // save top node
int j = i << 1; // find smaller child
int k = j + 1;
if (k <= size && lessThan(heap[k], heap[j])) {
j = k;
}
while (j <= size && lessThan(heap[j], node)) {
heap[i] = heap[j]; // shift up child
i = j;
j = i << 1;
k = j + 1;
if (k <= size && lessThan(heap[k], heap[j])) {
j = k;
}
}
heap[i] = node; // install saved node
}
}

View File

@ -18,6 +18,8 @@
package org.apache.solr.handler.export;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
@ -32,6 +34,7 @@ import org.apache.solr.schema.FieldType;
class StringFieldWriter extends FieldWriter {
private String field;
private FieldType fieldType;
private Map<Integer, SortedDocValues> lastDocValues = new HashMap<>();
private CharsRefBuilder cref = new CharsRefBuilder();
final ByteArrayUtf8CharSequence utf8 = new ByteArrayUtf8CharSequence(new byte[0], 0, 0) {
@Override
@ -61,7 +64,11 @@ class StringFieldWriter extends FieldWriter {
}
} else {
// field is not part of 'sort' param, but part of 'fl' param
SortedDocValues vals = DocValues.getSorted(reader, this.field);
SortedDocValues vals = lastDocValues.get(sortDoc.ord);
if (vals == null || vals.docID() >= sortDoc.docId) {
vals = DocValues.getSorted(reader, this.field);
lastDocValues.put(sortDoc.ord, vals);
}
if (vals.advance(sortDoc.docId) != sortDoc.docId) {
return false;
}
@ -73,9 +80,9 @@ class StringFieldWriter extends FieldWriter {
ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null));
} else {
String v = null;
if(sortValue != null) {
if (sortValue != null) {
v = ((StringValue) sortValue).getLastString();
if(v == null) {
if (v == null) {
fieldType.indexedToReadable(ref, cref);
v = cref.toString();
((StringValue) sortValue).setLastString(v);

View File

@ -29,15 +29,16 @@ import org.apache.lucene.util.LongValues;
class StringValue implements SortValue {
protected SortedDocValues globalDocValues;
private final SortedDocValues globalDocValues;
private final OrdinalMap ordinalMap;
private final String field;
private final IntComp comp;
protected OrdinalMap ordinalMap;
protected LongValues toGlobal = LongValues.IDENTITY; // this segment to global ordinal. NN;
protected SortedDocValues docValues;
protected String field;
protected int currentOrd;
protected IntComp comp;
protected int lastDocID;
private boolean present;
@ -50,6 +51,8 @@ class StringValue implements SortValue {
this.docValues = globalDocValues;
if (globalDocValues instanceof MultiDocValues.MultiSortedDocValues) {
this.ordinalMap = ((MultiDocValues.MultiSortedDocValues) globalDocValues).mapping;
} else {
this.ordinalMap = null;
}
this.field = field;
this.comp = comp;
@ -66,7 +69,8 @@ class StringValue implements SortValue {
}
public StringValue copy() {
return new StringValue(globalDocValues, field, comp);
StringValue copy = new StringValue(globalDocValues, field, comp);
return copy;
}
public void setCurrentValue(int docId) throws IOException {

View File

@ -47,6 +47,7 @@ class TripleValueSortDoc extends DoubleValueSortDoc {
public void reset() {
this.docId = -1;
this.docBase = -1;
this.ord = -1;
value1.reset();
value2.reset();
value3.reset();

View File

@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.StreamParams;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.Utils;
import org.apache.solr.index.LogDocMergePolicyFactory;
@ -709,8 +710,8 @@ public class TestExportWriter extends SolrTestCaseJ4 {
}
private void createLargeIndex() throws Exception {
int BATCH_SIZE = 1000;
int NUM_BATCHES = 100;
int BATCH_SIZE = 5000;
int NUM_BATCHES = 20;
SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
for (int i = 0; i < NUM_BATCHES; i++) {
for (int j = 0; j < BATCH_SIZE; j++) {
@ -720,7 +721,7 @@ public class TestExportWriter extends SolrTestCaseJ4 {
"random_i_p", String.valueOf(random().nextInt(BATCH_SIZE)),
"sortabledv", TestUtil.randomSimpleString(random(), 2, 3),
"sortabledv_udvas", String.valueOf(random().nextInt(100)),
"small_i_p", String.valueOf((i + j) % 7)
"small_i_p", String.valueOf((i + j) % 37)
);
}
updateJ(jsonAdd(docs), null);
@ -758,6 +759,30 @@ public class TestExportWriter extends SolrTestCaseJ4 {
}
assertTrue("missing value " + i + " in results", found);
}
req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv_udvas,small_i_p", "sort", "sortabledv_udvas asc", "expr", "rollup(input(),over=\"sortabledv_udvas\", sum(small_i_p),avg(small_i_p),min(small_i_p),count(*))");
rsp = h.query(req);
rspMap = mapper.readValue(rsp, HashMap.class);
docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
assertNotNull("missing document results: " + rspMap, docs);
assertEquals("wrong number of unique docs", 100, docs.size());
for (Map<String, Object> doc : docs) {
assertNotNull("missing sum: " + doc, doc.get("sum(small_i_p)"));
assertEquals(18000.0, ((Number)doc.get("sum(small_i_p)")).doubleValue(), 2500.0);
assertNotNull("missing avg: " + doc, doc.get("avg(small_i_p)"));
assertEquals(18.0, ((Number)doc.get("avg(small_i_p)")).doubleValue(), 2.5);
assertNotNull("missing count: " + doc, doc.get("count(*)"));
assertEquals(1000.0, ((Number)doc.get("count(*)")).doubleValue(), 200.0);
}
// try invalid field types
req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv,small_i_p", "sort", "sortabledv asc", "expr", "unique(input(),over=\"sortabledv\")");
rsp = h.query(req);
rspMap = mapper.readValue(rsp, HashMap.class);
assertEquals("wrong response status", 400, ((Number)Utils.getObjectByPath(rspMap, false, "/responseHeader/status")).intValue());
docs = (List<Map<String, Object>>) Utils.getObjectByPath(rspMap, false, "/response/docs");
assertEquals("wrong number of docs", 1, docs.size());
Map<String, Object> doc = docs.get(0);
assertTrue("doc doesn't have exception", doc.containsKey(StreamParams.EXCEPTION));
assertTrue("wrong exception message", doc.get(StreamParams.EXCEPTION).toString().contains("Must have useDocValuesAsStored='true'"));
}
private void validateSort(int numDocs) throws Exception {

View File

@ -39,6 +39,8 @@ You can use `/export` to make requests to export the result set of a query.
All queries must include `sort` and `fl` parameters, or the query will return an error. Filter queries are also supported.
Optional parameter `batchSize` determines the size of the internal buffers for partial results. The default value is 30000 but users may want to specify smaller values to limit the memory use (at the cost of degraded performance) or higher values to improve export performance (the relationship is not linear and larger values don't bring proportionally larger performance increases).
The supported response writers are `json` and `javabin`. For backward compatibility reasons `wt=xsort` is also supported as input, but `wt=xsort` behaves same as `wt=json`. The default output format is `json`.
Here is an example of an export request of some indexed log data:

View File

@ -17,6 +17,8 @@
package org.apache.solr.client.solrj.io;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
@ -94,6 +96,10 @@ public class Tuple implements Cloneable, MapWriter {
throw new RuntimeException("must have a matching number of key-value pairs");
}
for (int i = 0; i < fields.length; i += 2) {
// skip empty entries
if (fields[i] == null) {
continue;
}
put(fields[i], fields[i + 1]);
}
}
@ -330,4 +336,15 @@ public class Tuple implements Cloneable, MapWriter {
}
return tuple;
}
/**
* Create a new empty tuple marked as EXCEPTION and optionally EOF.
* @param t exception - full stack trace will be used as an exception message
* @param eof if true the tuple will be marked as EOF
*/
public static Tuple EXCEPTION(Throwable t, boolean eof) {
StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
return EXCEPTION(sw.toString(), eof);
}
}