SOLR-13682: command line option to export documents to a file

This commit is contained in:
noble 2019-08-10 11:57:15 +10:00
parent 382627d94b
commit aec2eb0c9d
7 changed files with 2577 additions and 2047 deletions

View File

@ -57,6 +57,8 @@ New Features
* SOLR-11866: QueryElevationComponent can have query rules configured with match="subset" wherein the words need only
match a subset of the query's words and in any order. (Bruno Roustant via David Smiley)
* SOLR-13682: command line option to export documents to a file (noble)
Improvements
----------------------

View File

@ -312,7 +312,7 @@ function print_usage() {
if [ -z "$CMD" ]; then
echo ""
echo "Usage: solr COMMAND OPTIONS"
echo " where COMMAND is one of: start, stop, restart, status, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config, autoscaling"
echo " where COMMAND is one of: start, stop, restart, status, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config, autoscaling, export"
echo ""
echo " Standalone server example (start Solr running in the background on port 8984):"
echo ""
@ -1349,6 +1349,11 @@ if [[ "$SCRIPT_CMD" == "autoscaling" ]]; then
exit $?
fi
if [[ "$SCRIPT_CMD" == "export" ]]; then
run_tool export $@
exit $?
fi
if [[ "$SCRIPT_CMD" == "auth" ]]; then
VERBOSE=""

View File

@ -214,6 +214,7 @@ IF "%1"=="-v" goto get_version
IF "%1"=="-version" goto get_version
IF "%1"=="assert" goto run_assert
IF "%1"=="autoscaling" goto run_autoscaling
IF "%1"=="export" goto run_export
REM Only allow the command to be the first argument, assume start if not supplied
IF "%1"=="start" goto set_script_cmd
@ -290,7 +291,7 @@ goto done
:script_usage
@echo.
@echo Usage: solr COMMAND OPTIONS
@echo where COMMAND is one of: start, stop, restart, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config, autoscaling
@echo where COMMAND is one of: start, stop, restart, healthcheck, create, create_core, create_collection, delete, version, zk, auth, assert, config, autoscaling, export
@echo.
@echo Standalone server example (start Solr running in the background on port 8984):
@echo.
@ -1412,7 +1413,14 @@ goto done
-Dlog4j.configurationFile="file:///%DEFAULT_SERVER_DIR%\resources\log4j2-console.xml" ^
-classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^
org.apache.solr.util.SolrCLI %*
goto done
goto done:
:run_export
"%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^
-Dlog4j.configurationFile="file:///%DEFAULT_SERVER_DIR%\resources\log4j2-console.xml" ^
-classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^
org.apache.solr.util.SolrCLI %*
goto done:
:parse_config_args
IF [%1]==[] goto run_config

View File

@ -0,0 +1,328 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.StreamingResponseCallback;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CursorMarkParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.FastWriter;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrJSONWriter;
import static org.apache.solr.common.params.CommonParams.FL;
import static org.apache.solr.common.params.CommonParams.JAVABIN;
public class ExportTool extends SolrCLI.ToolBase {
@Override
public String getName() {
return "export";
}
@Override
public Option[] getOptions() {
return OPTIONS;
}
public static class Info {
String baseurl;
String format;
String query;
String coll;
String out;
String fields;
long limit = 100;
long docsWritten = 0;
PrintStream output;
//for testing purposes only
public SolrClient solrClient;
public Info(String url) {
setUrl(url);
setOutFormat(null, "jsonl");
}
public void setUrl(String url) {
int idx = url.lastIndexOf('/');
baseurl = url.substring(0, idx);
coll = url.substring(idx + 1);
query = "*:*";
}
public void setLimit(String maxDocsStr) {
limit = Long.parseLong(maxDocsStr);
if (limit == -1) limit = Long.MAX_VALUE;
}
public void setOutFormat(String out, String format) {
this.format = format;
if (format == null) format = "jsonl";
if (!formats.contains(format)) {
throw new IllegalArgumentException("format must be one of :" + formats);
}
this.out = out;
if (this.out == null) {
this.out = JAVABIN.equals(format) ?
coll + ".javabin" :
coll + ".json";
}
}
DocsSink getSink() {
return JAVABIN.equals(format) ? new JavabinSink(this) : new JsonSink(this);
}
void exportDocsWithCursorMark() throws SolrServerException, IOException {
DocsSink sink = getSink();
solrClient = new CloudSolrClient.Builder(Collections.singletonList(baseurl)).build();
NamedList<Object> rsp1 = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
new MapSolrParams(Collections.singletonMap("collection", coll))));
String uniqueKey = (String) rsp1.get("uniqueKey");
sink.start();
try {
NamedList<Object> rsp;
SolrQuery q = (new SolrQuery(query))
.setParam("collection", coll)
.setRows(100)
.setSort(SolrQuery.SortClause.asc(uniqueKey));
if (fields != null) {
q.setParam(FL, fields);
}
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
boolean done = false;
StreamingResponseCallback streamer = getStreamer(sink);
if(output!= null) output.println("Exporting data to : "+ out);
while (!done) {
if (docsWritten >= limit) break;
QueryRequest request = new QueryRequest(q);
request.setResponseParser(new StreamingBinaryResponseParser(streamer));
q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
rsp = solrClient.request(request);
String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
break;
}
cursorMark = nextCursorMark;
if(output!= null) output.print(".");
}
if(output!= null) output.println("\n DONE!");
} finally {
sink.end();
solrClient.close();
}
}
private StreamingResponseCallback getStreamer(DocsSink sink) {
return new StreamingResponseCallback() {
@Override
public void streamSolrDocument(SolrDocument doc) {
try {
sink.accept(doc);
} catch (IOException e) {
throw new RuntimeException(e);
}
docsWritten++;
}
@Override
public void streamDocListInfo(long numFound, long start, Float maxScore) {
}
};
}
}
static Set<String> formats = ImmutableSet.of(JAVABIN, "jsonl");
@Override
protected void runImpl(CommandLine cli) throws Exception {
Info info = new Info(cli.getOptionValue("url"));
info.query = cli.getOptionValue("query", "*:*");
info.setOutFormat(cli.getOptionValue("out"), cli.getOptionValue("format"));
info.fields = cli.getOptionValue("fields");
info.setLimit(cli.getOptionValue("limit", "100"));
info.output = super.stdout;
info.exportDocsWithCursorMark();
}
interface DocsSink {
void start() throws IOException;
void accept(SolrDocument document) throws IOException;
void end() throws IOException;
}
private static final Option[] OPTIONS = {
OptionBuilder
.hasArg()
.isRequired(true)
.withDescription("Address of the collection, example http://localhost:8983/solr/gettingstarted")
.create("url"),
OptionBuilder
.hasArg()
.isRequired(false)
.withDescription("file name . defaults to collection-name.<format>")
.create("out"),
OptionBuilder
.hasArg()
.isRequired(false)
.withDescription("format json/javabin, default to json. file extension would be .json")
.create("format"),
OptionBuilder
.hasArg()
.isRequired(false)
.withDescription("Max number of docs to download. default = 100, use -1 for all docs")
.create("limit"),
OptionBuilder
.hasArg()
.isRequired(false)
.withDescription("A custom query, default is *:*")
.create("query"),
OptionBuilder
.hasArg()
.isRequired(false)
.withDescription("Comma separated fields. By default all fields are fetched")
.create("fields")
};
private static class JsonSink implements DocsSink {
private final Info info;
private SolrJSONWriter jsonw;
private FastWriter writer;
private FileOutputStream fos;
public JsonSink(Info info) {
this.info = info;
}
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
writer = FastWriter.wrap(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
jsonw = new SolrJSONWriter(writer);
jsonw.setIndent(false);
}
@Override
public void end() throws IOException {
jsonw.close();
fos.close();
}
@Override
public void accept(SolrDocument doc) throws IOException {
Map m = new LinkedHashMap(doc.size());
doc.forEach((s, field) -> {
if (s.equals("_version_")) return;
if (field instanceof List) {
if (((List) field).size() == 1) {
field = ((List) field).get(0);
}
}
m.put(s, field);
});
jsonw.writeObj(m);
writer.flush();
writer.append('\n');
}
}
private static class JavabinSink implements DocsSink {
private final Info info;
JavaBinCodec codec;
FileOutputStream fos;
public JavabinSink(Info info) {
this.info = info;
}
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
codec = new JavaBinCodec(fos, null);
codec.writeTag(JavaBinCodec.NAMED_LST, 2);
codec.writeStr("params");
codec.writeNamedList(new NamedList<>());
codec.writeStr("docs");
codec.writeTag(JavaBinCodec.ITERATOR);
}
@Override
public void end() throws IOException {
codec.writeTag(JavaBinCodec.END);
codec.close();
fos.close();
}
@Override
public void accept(SolrDocument doc) throws IOException {
SolrInputDocument document = new SolrInputDocument();
doc.forEach((s, o) -> {
if (s.equals("_version_")) return;
if (o instanceof List) {
if (((List) o).size() == 1) o = ((List) o).get(0);
}
document.addField(s, o);
});
codec.writeSolrInputDocument(document);
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JsonRecordReader;
@SolrTestCaseJ4.SuppressSSL
public class TestExportTool extends SolrCloudTestCase {
public void testBasic() throws Exception {
String COLLECTION_NAME = "globalLoaderColl";
MiniSolrCloudCluster cluster = configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
try {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 2, 1)
.setMaxShardsPerNode(100)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
File.separator).getPath();
UpdateRequest ur = new UpdateRequest();
ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
int docCount = 1000;
for (int i = 0; i < docCount; i++) {
ur.add("id", String.valueOf(i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50));
}
cluster.getSolrClient().request(ur, COLLECTION_NAME);
QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
assertEquals(docCount, qr.getResults().getNumFound());
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
ExportTool.Info info = new ExportTool.Info(url);
String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("200");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 200);
JsonRecordReader jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
Reader rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
assertTrue(count[0] >= 200);
} finally {
rdr.close();
}
info = new ExportTool.Info(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("-1");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 1000);
jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
assertTrue(count[0] >= 1000);
} finally {
rdr.close();
}
info = new ExportTool.Info(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("200");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 200);
FileInputStream fis = new FileInputStream(absolutePath);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
assertTrue(count[0] >= 200);
} finally {
fis.close();
}
info = new ExportTool.Info(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 1000);
fis = new FileInputStream(absolutePath);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
assertTrue(count[0] >= 1000);
} finally {
fis.close();
}
} finally {
cluster.shutdown();
}
}
}

View File

@ -879,3 +879,30 @@ Examples of this command:
`bin/solr zk mkroot /solr -z 123.321.23.43:2181`
`bin/solr zk mkroot /solr/production`
== Exporting data to a file
*Example*
Export all documents from a collection `gettingstarted` to a file called `gettingstarted.json`
`bin/solr export -url http://localhost:8983/solr/gettingstarted limit -1`
*Arguments*
* `format` : `jsonl` (default) or `javabin`. `format=javabin` exports to a file with extension `.javabin` which is the native Solr format. This is compact & faster to import
* `out` : export file name
* `query` : a custom query , default is *:*
* `fields`: a comma separated list of fields to be exported
* `limit` : no:of docs. default is 100 , send -1 to import all the docs
=== Importing the data to a collection
*Example: importing the `jsonl` files*
`curl -X POST -d @gettingstarted.json http://localhost:8983/solr/gettingstarted/update/json/docs?commit=true`
*Example: importing the `javabin` files*
`curl -X POST --header "Content-Type: application/javabin" --data-binary @gettingstarted.javabin http://localhost:8983/solr/gettingstarted/update?commit=true`

View File

@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
/**
* Represent the field-value information needed to construct and index
@ -48,7 +49,14 @@ public class SolrInputDocument extends SolrDocumentBase<SolrInputField, SolrInpu
@Override
public void writeMap(EntryWriter ew) throws IOException {
_fields.forEach(ew.getBiConsumer());
BiConsumer<CharSequence, Object> bc = ew.getBiConsumer();
BiConsumer<CharSequence, Object> wrapper = (k, o) -> {
if (o instanceof SolrInputField) {
o = ((SolrInputField) o).getValue();
}
bc.accept(k, o);
};
_fields.forEach(wrapper);
}
public SolrInputDocument(Map<String,SolrInputField> fields) {