SOLR-13688: Run the bin/solr export command multithreaded

This commit is contained in:
noble 2019-08-13 18:54:05 +10:00
parent 06dd37e907
commit 03fd3d3923
2 changed files with 402 additions and 138 deletions

View File

@ -17,42 +17,65 @@
package org.apache.solr.util;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.StreamingResponseCallback;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CursorMarkParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.FastWriter;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrJSONWriter;
import org.apache.solr.common.util.StrUtils;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import static org.apache.solr.common.params.CommonParams.FL;
import static org.apache.solr.common.params.CommonParams.JAVABIN;
import static org.apache.solr.common.params.CommonParams.Q;
import static org.apache.solr.common.params.CommonParams.SORT;
import static org.apache.solr.common.util.JavaBinCodec.SOLRINPUTDOC;
public class ExportTool extends SolrCLI.ToolBase {
@Override
@ -65,7 +88,7 @@ public class ExportTool extends SolrCLI.ToolBase {
return OPTIONS;
}
public static class Info {
public static abstract class Info {
String baseurl;
String format;
String query;
@ -73,10 +96,12 @@ public class ExportTool extends SolrCLI.ToolBase {
String out;
String fields;
long limit = 100;
long docsWritten = 0;
AtomicLong docsWritten = new AtomicLong(0);
int bufferSize = 1024 * 1024;
PrintStream output;
//for testing purposes only
public SolrClient solrClient;
String uniqueKey;
CloudSolrClient solrClient;
DocsSink sink;
public Info(String url) {
@ -117,60 +142,24 @@ public class ExportTool extends SolrCLI.ToolBase {
return JAVABIN.equals(format) ? new JavabinSink(this) : new JsonSink(this);
}
void exportDocsWithCursorMark() throws SolrServerException, IOException {
DocsSink sink = getSink();
abstract void exportDocs() throws Exception;
void fetchUniqueKey() throws SolrServerException, IOException {
solrClient = new CloudSolrClient.Builder(Collections.singletonList(baseurl)).build();
NamedList<Object> rsp1 = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
NamedList<Object> response = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
new MapSolrParams(Collections.singletonMap("collection", coll))));
String uniqueKey = (String) rsp1.get("uniqueKey");
sink.start();
try {
NamedList<Object> rsp;
SolrQuery q = (new SolrQuery(query))
.setParam("collection", coll)
.setRows(100)
.setSort(SolrQuery.SortClause.asc(uniqueKey));
if (fields != null) {
q.setParam(FL, fields);
}
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
boolean done = false;
StreamingResponseCallback streamer = getStreamer(sink);
if(output!= null) output.println("Exporting data to : "+ out);
while (!done) {
if (docsWritten >= limit) break;
QueryRequest request = new QueryRequest(q);
request.setResponseParser(new StreamingBinaryResponseParser(streamer));
q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
rsp = solrClient.request(request);
String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
break;
}
cursorMark = nextCursorMark;
if(output!= null) output.print(".");
}
if(output!= null) output.println("\n DONE!");
} finally {
sink.end();
solrClient.close();
}
uniqueKey = (String) response.get("uniqueKey");
}
private StreamingResponseCallback getStreamer(DocsSink sink) {
public static StreamingResponseCallback getStreamer(Consumer<SolrDocument> sink) {
return new StreamingResponseCallback() {
@Override
public void streamSolrDocument(SolrDocument doc) {
try {
sink.accept(doc);
} catch (IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
docsWritten++;
}
@Override
@ -186,21 +175,24 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
protected void runImpl(CommandLine cli) throws Exception {
Info info = new Info(cli.getOptionValue("url"));
String url = cli.getOptionValue("url");
Info info = new MultiThreadedRunner(url);
info.query = cli.getOptionValue("query", "*:*");
info.setOutFormat(cli.getOptionValue("out"), cli.getOptionValue("format"));
info.fields = cli.getOptionValue("fields");
info.setLimit(cli.getOptionValue("limit", "100"));
info.output = super.stdout;
info.exportDocsWithCursorMark();
info.exportDocs();
}
interface DocsSink {
void start() throws IOException;
default void start() throws IOException {
}
void accept(SolrDocument document) throws IOException;
void accept(SolrDocument document) throws IOException, InterruptedException;
void end() throws IOException;
default void end() throws IOException {
}
}
private static final Option[] OPTIONS = {
@ -236,11 +228,13 @@ public class ExportTool extends SolrCLI.ToolBase {
.create("fields")
};
private static class JsonSink implements DocsSink {
static class JsonSink implements DocsSink {
private final Info info;
private SolrJSONWriter jsonw;
private FastWriter writer;
private FileOutputStream fos;
private CharArr charArr = new CharArr(1024 * 2);
JSONWriter jsonWriter = new JSONWriter(charArr, -1);
private Writer writer;
private OutputStream fos;
public AtomicLong docs = new AtomicLong();
public JsonSink(Info info) {
this.info = info;
@ -249,24 +243,27 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
writer = FastWriter.wrap(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
jsonw = new SolrJSONWriter(writer);
jsonw.setIndent(false);
if (info.bufferSize > 0) {
fos = new BufferedOutputStream(fos, info.bufferSize);
}
writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
}
@Override
public void end() throws IOException {
jsonw.close();
writer.flush();
fos.flush();
fos.close();
}
@Override
public void accept(SolrDocument doc) throws IOException {
public synchronized void accept(SolrDocument doc) throws IOException {
docs.incrementAndGet();
charArr.reset();
Map m = new LinkedHashMap(doc.size());
doc.forEach((s, field) -> {
if (s.equals("_version_")) return;
if (s.equals("_version_") || s.equals("_roor_")) return;
if (field instanceof List) {
if (((List) field).size() == 1) {
field = ((List) field).get(0);
@ -274,17 +271,16 @@ public class ExportTool extends SolrCLI.ToolBase {
}
m.put(s, field);
});
jsonw.writeObj(m);
writer.flush();
jsonWriter.write(m);
writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd());
writer.append('\n');
}
}
private static class JavabinSink implements DocsSink {
private final Info info;
JavaBinCodec codec;
FileOutputStream fos;
OutputStream fos;
public JavabinSink(Info info) {
this.info = info;
@ -293,6 +289,9 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
if (info.bufferSize > 0) {
fos = new BufferedOutputStream(fos, info.bufferSize);
}
codec = new JavaBinCodec(fos, null);
codec.writeTag(JavaBinCodec.NAMED_LST, 2);
codec.writeStr("params");
@ -306,23 +305,208 @@ public class ExportTool extends SolrCLI.ToolBase {
public void end() throws IOException {
codec.writeTag(JavaBinCodec.END);
codec.close();
fos.flush();
fos.close();
}
private BiConsumer<String, Object> bic= new BiConsumer<>() {
@Override
public void accept(String s, Object o) {
try {
if (s.equals("_version_") || s.equals("_root_")) return;
codec.writeExternString(s);
codec.writeVal(o);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
@Override
public void accept(SolrDocument doc) throws IOException {
SolrInputDocument document = new SolrInputDocument();
doc.forEach((s, o) -> {
if (s.equals("_version_")) return;
if (o instanceof List) {
if (((List) o).size() == 1) o = ((List) o).get(0);
}
document.addField(s, o);
});
codec.writeSolrInputDocument(document);
public synchronized void accept(SolrDocument doc) throws IOException {
int sz = doc.size();
if(doc.containsKey("_version_")) sz--;
if(doc.containsKey("_root_")) sz--;
codec.writeTag(SOLRINPUTDOC, sz);
codec.writeFloat(1f); // document boost
doc.forEach(bic);
}
}
static class MultiThreadedRunner extends Info {
ExecutorService producerThreadpool, consumerThreadpool;
ArrayBlockingQueue<SolrDocument> queue = new ArrayBlockingQueue(1000);
SolrDocument EOFDOC = new SolrDocument();
volatile boolean failed = false;
Map<String, CoreHandler> corehandlers = new HashMap();
public MultiThreadedRunner(String url) {
super(url);
}
@Override
void exportDocs() throws Exception {
sink = getSink();
fetchUniqueKey();
ClusterStateProvider stateProvider = solrClient.getClusterStateProvider();
DocCollection coll = stateProvider.getCollection(this.coll);
Map<String, Slice> m = coll.getSlicesMap();
producerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(m.size(),
new DefaultSolrThreadFactory("solrcli-exporter-producers"));
consumerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(1,
new DefaultSolrThreadFactory("solrcli-exporter-consumer"));
sink.start();
CountDownLatch consumerlatch = new CountDownLatch(1);
try {
addConsumer(consumerlatch);
addProducers(m);
if (output != null) {
output.println("NO of shards : " + corehandlers.size());
}
CountDownLatch producerLatch = new CountDownLatch(corehandlers.size());
corehandlers.forEach((s, coreHandler) -> producerThreadpool.submit(() -> {
try {
coreHandler.exportDocsFromCore();
} catch (Exception e) {
if(output != null) output.println("Error exporting docs from : "+s);
}
producerLatch.countDown();
}));
producerLatch.await();
queue.offer(EOFDOC, 10, TimeUnit.SECONDS);
consumerlatch.await();
} finally {
sink.end();
solrClient.close();
producerThreadpool.shutdownNow();
consumerThreadpool.shutdownNow();
if (failed) {
try {
Files.delete(new File(out).toPath());
} catch (IOException e) {
//ignore
}
}
}
}
private void addProducers(Map<String, Slice> m) {
for (Map.Entry<String, Slice> entry : m.entrySet()) {
Slice slice = entry.getValue();
Replica replica = slice.getLeader();
if (replica == null) replica = slice.getReplicas().iterator().next();// get a random replica
CoreHandler coreHandler = new CoreHandler(replica);
corehandlers.put(replica.getCoreName(), coreHandler);
}
}
private void addConsumer(CountDownLatch consumerlatch) {
consumerThreadpool.submit(() -> {
while (true) {
SolrDocument doc = null;
try {
doc = queue.poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (output != null) output.println("Consumer interrupted");
failed = true;
break;
}
if (doc == EOFDOC) break;
try {
if (docsWritten.get() > limit) continue;
sink.accept(doc);
docsWritten.incrementAndGet();
} catch (Exception e) {
if (output != null) output.println("Failed to write to file " + e.getMessage());
failed = true;
}
}
consumerlatch.countDown();
});
}
class CoreHandler {
final Replica replica;
long expectedDocs;
AtomicLong receivedDocs = new AtomicLong();
CoreHandler(Replica replica) {
this.replica = replica;
}
boolean exportDocsFromCore()
throws IOException, SolrServerException {
HttpSolrClient client = new HttpSolrClient.Builder(baseurl).build();
try {
expectedDocs = getDocCount(replica.getCoreName(), client);
GenericSolrRequest request;
ModifiableSolrParams params = new ModifiableSolrParams();
params.add(Q, query);
if (fields != null) params.add(FL, fields);
params.add(SORT, uniqueKey + " asc");
params.add(CommonParams.DISTRIB, "false");
params.add(CommonParams.ROWS, "1000");
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
Consumer<SolrDocument> wrapper = doc -> {
try {
queue.offer(doc, 10, TimeUnit.SECONDS);
receivedDocs.incrementAndGet();
} catch (InterruptedException e) {
failed = true;
if (output != null) output.println("Failed to write docs from" + e.getMessage());
}
};
StreamingBinaryResponseParser responseParser = new StreamingBinaryResponseParser(getStreamer(wrapper));
while (true) {
if (failed) return false;
if (docsWritten.get() > limit) return true;
params.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
request = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/" + replica.getCoreName() + "/select", params);
request.setResponseParser(responseParser);
try {
NamedList<Object> rsp = client.request(request);
String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
if (output != null)
output.println(StrUtils.formatString("\nExport complete for : {0}, docs : {1}", replica.getCoreName(), receivedDocs.get()));
if (expectedDocs != receivedDocs.get()) {
if (output != null) {
output.println(StrUtils.formatString("Could not download all docs for core {0} , expected: {1} , actual",
replica.getCoreName(), expectedDocs, receivedDocs));
return false;
}
}
return true;
}
cursorMark = nextCursorMark;
if (output != null) output.print(".");
} catch (SolrServerException e) {
if(output != null) output.println("Error reading from server "+ replica.getBaseUrl()+"/"+ replica.getCoreName());
failed = true;
return false;
}
}
} finally {
client.close();
}
}
}
}
static long getDocCount(String coreName, HttpSolrClient client) throws SolrServerException, IOException {
SolrQuery q = new SolrQuery("*:*");
q.setRows(0);
q.add("distrib", "false");
GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/" + coreName + "/select", q);
NamedList<Object> res = client.request(request);
SolrDocumentList sdl = (SolrDocumentList) res.get("response");
return sdl.getNumFound();
}
}

View File

@ -19,14 +19,18 @@ package org.apache.solr.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
@ -34,6 +38,9 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JsonRecordReader;
@ -71,82 +78,155 @@ public class TestExportTool extends SolrCloudTestCase {
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
ExportTool.Info info = new ExportTool.Info(url);
ExportTool.Info info = new ExportTool.MultiThreadedRunner(url);
String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("200");
info.exportDocsWithCursorMark();
info.fields = "id,desc_s";
info.exportDocs();
assertTrue(info.docsWritten >= 200);
JsonRecordReader jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
Reader rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
assertTrue(count[0] >= 200);
} finally {
rdr.close();
}
assertJsonDocsCount(info, 200);
info = new ExportTool.Info(url);
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("-1");
info.exportDocsWithCursorMark();
info.fields = "id,desc_s";
info.exportDocs();
assertTrue(info.docsWritten >= 1000);
jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
assertTrue(count[0] >= 1000);
} finally {
rdr.close();
}
assertJsonDocsCount(info, 1000);
info = new ExportTool.Info(url);
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("200");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 200);
info.fields = "id,desc_s";
info.exportDocs();
FileInputStream fis = new FileInputStream(absolutePath);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
assertTrue(count[0] >= 200);
} finally {
fis.close();
}
assertJavabinDocsCount(info, 200);
info = new ExportTool.Info(url);
info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
info.exportDocsWithCursorMark();
assertTrue(info.docsWritten >= 1000);
fis = new FileInputStream(absolutePath);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
assertTrue(count[0] >= 1000);
} finally {
fis.close();
}
info.fields = "id,desc_s";
info.exportDocs();
assertJavabinDocsCount(info, 1000);
} finally {
cluster.shutdown();
}
}
@Nightly
public void testVeryLargeCluster() throws Exception {
String COLLECTION_NAME = "veryLargeColl";
MiniSolrCloudCluster cluster = configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
try {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 8, 1)
.setMaxShardsPerNode(10)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8);
String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
File.separator).getPath();
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
int docCount = 0;
for (int j = 0; j < 4; j++) {
int bsz = 10000;
UpdateRequest ur = new UpdateRequest();
ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
for (int i = 0; i < bsz; i++) {
ur.add("id", String.valueOf((j * bsz) + i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50));
}
cluster.getSolrClient().request(ur, COLLECTION_NAME);
docCount += bsz;
}
QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
assertEquals(docCount, qr.getResults().getNumFound());
DocCollection coll = cluster.getSolrClient().getClusterStateProvider().getCollection(COLLECTION_NAME);
HashMap<String, Long> docCounts = new HashMap<>();
long totalDocsFromCores = 0;
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getLeader();
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getBaseUrl()).build()) {
long count = ExportTool.getDocCount(replica.getCoreName(), client);
docCounts.put(replica.getCoreName(), count);
totalDocsFromCores += count;
}
}
assertEquals(docCount, totalDocsFromCores);
ExportTool.MultiThreadedRunner info = null;
String absolutePath = null;
info = new ExportTool.MultiThreadedRunner(url);
info.output = System.out;
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
info.exportDocs();
assertJavabinDocsCount(info, docCount);
for (Map.Entry<String, Long> e : docCounts.entrySet()) {
assertEquals(e.getValue().longValue(), info.corehandlers.get(e.getKey()).receivedDocs.get());
}
info = new ExportTool.MultiThreadedRunner(url);
info.output = System.out;
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.fields = "id,desc_s";
info.setLimit("-1");
info.exportDocs();
long actual = ((ExportTool.JsonSink) info.sink).docs.get();
assertTrue("docs written :" + actual + "docs produced : " + info.docsWritten.get(), actual >= docCount);
assertJsonDocsCount(info, docCount);
} finally {
cluster.shutdown();
}
}
private void assertJavabinDocsCount(ExportTool.Info info, int expected) throws IOException {
assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
FileInputStream fis = new FileInputStream(info.out);
try {
int[] count = new int[]{0};
FastInputStream in = FastInputStream.wrap(fis);
new JavaBinUpdateRequestCodec()
.unmarshal(in, (document, req, commitWithin, override) -> {
assertEquals(2, document.size());
count[0]++;
});
assertTrue(count[0] >= expected);
} finally {
fis.close();
}
}
private void assertJsonDocsCount(ExportTool.Info info, int expected) throws IOException {
assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
JsonRecordReader jsonReader;
Reader rdr;
jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
rdr = new InputStreamReader(new FileInputStream(info.out), StandardCharsets.UTF_8);
try {
int[] count = new int[]{0};
jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
assertTrue(count[0] >= expected);
} finally {
rdr.close();
}
}
}