diff --git a/solr/core/src/java/org/apache/solr/util/ExportTool.java b/solr/core/src/java/org/apache/solr/util/ExportTool.java index 7b5525e0107..ab29800ce01 100644 --- a/solr/core/src/java/org/apache/solr/util/ExportTool.java +++ b/solr/core/src/java/org/apache/solr/util/ExportTool.java @@ -17,42 +17,65 @@ package org.apache.solr.util; +import java.io.BufferedOutputStream; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintStream; +import java.io.Writer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import com.google.common.collect.ImmutableSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; -import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.StreamingResponseCallback; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.ClusterStateProvider; +import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser; import org.apache.solr.client.solrj.request.GenericSolrRequest; -import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CursorMarkParams; import org.apache.solr.common.params.MapSolrParams; -import org.apache.solr.common.util.FastWriter; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.common.util.NamedList; -import org.apache.solr.common.util.SolrJSONWriter; +import org.apache.solr.common.util.StrUtils; +import org.noggit.CharArr; +import org.noggit.JSONWriter; import static org.apache.solr.common.params.CommonParams.FL; import static org.apache.solr.common.params.CommonParams.JAVABIN; +import static org.apache.solr.common.params.CommonParams.Q; +import static org.apache.solr.common.params.CommonParams.SORT; +import static org.apache.solr.common.util.JavaBinCodec.SOLRINPUTDOC; public class ExportTool extends SolrCLI.ToolBase { @Override @@ -65,7 +88,7 @@ public class ExportTool extends SolrCLI.ToolBase { return OPTIONS; } - public static class Info { + public static abstract class Info { String baseurl; String format; String query; @@ -73,10 +96,12 @@ public class ExportTool extends SolrCLI.ToolBase { String out; String fields; long limit = 100; - long docsWritten = 0; + AtomicLong docsWritten = new AtomicLong(0); + int bufferSize = 1024 * 1024; PrintStream output; - //for testing purposes only - public SolrClient solrClient; + String uniqueKey; + CloudSolrClient solrClient; + DocsSink sink; public Info(String url) { @@ -117,60 +142,24 @@ public class ExportTool extends SolrCLI.ToolBase { return JAVABIN.equals(format) ? new JavabinSink(this) : new JsonSink(this); } - void exportDocsWithCursorMark() throws SolrServerException, IOException { - DocsSink sink = getSink(); + abstract void exportDocs() throws Exception; + + void fetchUniqueKey() throws SolrServerException, IOException { solrClient = new CloudSolrClient.Builder(Collections.singletonList(baseurl)).build(); - NamedList rsp1 = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey", + NamedList response = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey", new MapSolrParams(Collections.singletonMap("collection", coll)))); - String uniqueKey = (String) rsp1.get("uniqueKey"); - - sink.start(); - try { - NamedList rsp; - SolrQuery q = (new SolrQuery(query)) - .setParam("collection", coll) - .setRows(100) - .setSort(SolrQuery.SortClause.asc(uniqueKey)); - if (fields != null) { - q.setParam(FL, fields); - } - - String cursorMark = CursorMarkParams.CURSOR_MARK_START; - boolean done = false; - StreamingResponseCallback streamer = getStreamer(sink); - - if(output!= null) output.println("Exporting data to : "+ out); - while (!done) { - if (docsWritten >= limit) break; - QueryRequest request = new QueryRequest(q); - request.setResponseParser(new StreamingBinaryResponseParser(streamer)); - q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); - rsp = solrClient.request(request); - String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT); - if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) { - break; - } - cursorMark = nextCursorMark; - if(output!= null) output.print("."); - } - if(output!= null) output.println("\n DONE!"); - } finally { - sink.end(); - solrClient.close(); - - } + uniqueKey = (String) response.get("uniqueKey"); } - private StreamingResponseCallback getStreamer(DocsSink sink) { + public static StreamingResponseCallback getStreamer(Consumer sink) { return new StreamingResponseCallback() { @Override public void streamSolrDocument(SolrDocument doc) { try { sink.accept(doc); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } - docsWritten++; } @Override @@ -186,21 +175,24 @@ public class ExportTool extends SolrCLI.ToolBase { @Override protected void runImpl(CommandLine cli) throws Exception { - Info info = new Info(cli.getOptionValue("url")); + String url = cli.getOptionValue("url"); + Info info = new MultiThreadedRunner(url); info.query = cli.getOptionValue("query", "*:*"); info.setOutFormat(cli.getOptionValue("out"), cli.getOptionValue("format")); info.fields = cli.getOptionValue("fields"); info.setLimit(cli.getOptionValue("limit", "100")); info.output = super.stdout; - info.exportDocsWithCursorMark(); + info.exportDocs(); } interface DocsSink { - void start() throws IOException; + default void start() throws IOException { + } - void accept(SolrDocument document) throws IOException; + void accept(SolrDocument document) throws IOException, InterruptedException; - void end() throws IOException; + default void end() throws IOException { + } } private static final Option[] OPTIONS = { @@ -236,11 +228,13 @@ public class ExportTool extends SolrCLI.ToolBase { .create("fields") }; - private static class JsonSink implements DocsSink { + static class JsonSink implements DocsSink { private final Info info; - private SolrJSONWriter jsonw; - private FastWriter writer; - private FileOutputStream fos; + private CharArr charArr = new CharArr(1024 * 2); + JSONWriter jsonWriter = new JSONWriter(charArr, -1); + private Writer writer; + private OutputStream fos; + public AtomicLong docs = new AtomicLong(); public JsonSink(Info info) { this.info = info; @@ -249,24 +243,27 @@ public class ExportTool extends SolrCLI.ToolBase { @Override public void start() throws IOException { fos = new FileOutputStream(info.out); - writer = FastWriter.wrap(new OutputStreamWriter(fos, StandardCharsets.UTF_8)); - jsonw = new SolrJSONWriter(writer); - jsonw.setIndent(false); + if (info.bufferSize > 0) { + fos = new BufferedOutputStream(fos, info.bufferSize); + } + writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8); } @Override public void end() throws IOException { - jsonw.close(); + writer.flush(); + fos.flush(); fos.close(); - } @Override - public void accept(SolrDocument doc) throws IOException { + public synchronized void accept(SolrDocument doc) throws IOException { + docs.incrementAndGet(); + charArr.reset(); Map m = new LinkedHashMap(doc.size()); doc.forEach((s, field) -> { - if (s.equals("_version_")) return; + if (s.equals("_version_") || s.equals("_roor_")) return; if (field instanceof List) { if (((List) field).size() == 1) { field = ((List) field).get(0); @@ -274,17 +271,16 @@ public class ExportTool extends SolrCLI.ToolBase { } m.put(s, field); }); - jsonw.writeObj(m); - writer.flush(); + jsonWriter.write(m); + writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd()); writer.append('\n'); - } } private static class JavabinSink implements DocsSink { private final Info info; JavaBinCodec codec; - FileOutputStream fos; + OutputStream fos; public JavabinSink(Info info) { this.info = info; @@ -293,6 +289,9 @@ public class ExportTool extends SolrCLI.ToolBase { @Override public void start() throws IOException { fos = new FileOutputStream(info.out); + if (info.bufferSize > 0) { + fos = new BufferedOutputStream(fos, info.bufferSize); + } codec = new JavaBinCodec(fos, null); codec.writeTag(JavaBinCodec.NAMED_LST, 2); codec.writeStr("params"); @@ -306,23 +305,208 @@ public class ExportTool extends SolrCLI.ToolBase { public void end() throws IOException { codec.writeTag(JavaBinCodec.END); codec.close(); + fos.flush(); fos.close(); } + private BiConsumer bic= new BiConsumer<>() { + @Override + public void accept(String s, Object o) { + try { + if (s.equals("_version_") || s.equals("_root_")) return; + codec.writeExternString(s); + codec.writeVal(o); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; @Override - public void accept(SolrDocument doc) throws IOException { - SolrInputDocument document = new SolrInputDocument(); - doc.forEach((s, o) -> { - if (s.equals("_version_")) return; - if (o instanceof List) { - if (((List) o).size() == 1) o = ((List) o).get(0); - } - document.addField(s, o); - }); - - codec.writeSolrInputDocument(document); - + public synchronized void accept(SolrDocument doc) throws IOException { + int sz = doc.size(); + if(doc.containsKey("_version_")) sz--; + if(doc.containsKey("_root_")) sz--; + codec.writeTag(SOLRINPUTDOC, sz); + codec.writeFloat(1f); // document boost + doc.forEach(bic); } } + + static class MultiThreadedRunner extends Info { + ExecutorService producerThreadpool, consumerThreadpool; + ArrayBlockingQueue queue = new ArrayBlockingQueue(1000); + SolrDocument EOFDOC = new SolrDocument(); + volatile boolean failed = false; + Map corehandlers = new HashMap(); + + public MultiThreadedRunner(String url) { + super(url); + } + + + @Override + void exportDocs() throws Exception { + sink = getSink(); + fetchUniqueKey(); + ClusterStateProvider stateProvider = solrClient.getClusterStateProvider(); + DocCollection coll = stateProvider.getCollection(this.coll); + Map m = coll.getSlicesMap(); + producerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(m.size(), + new DefaultSolrThreadFactory("solrcli-exporter-producers")); + consumerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(1, + new DefaultSolrThreadFactory("solrcli-exporter-consumer")); + sink.start(); + CountDownLatch consumerlatch = new CountDownLatch(1); + try { + addConsumer(consumerlatch); + addProducers(m); + if (output != null) { + output.println("NO of shards : " + corehandlers.size()); + } + CountDownLatch producerLatch = new CountDownLatch(corehandlers.size()); + corehandlers.forEach((s, coreHandler) -> producerThreadpool.submit(() -> { + try { + coreHandler.exportDocsFromCore(); + } catch (Exception e) { + if(output != null) output.println("Error exporting docs from : "+s); + + } + producerLatch.countDown(); + })); + + producerLatch.await(); + queue.offer(EOFDOC, 10, TimeUnit.SECONDS); + consumerlatch.await(); + } finally { + sink.end(); + solrClient.close(); + producerThreadpool.shutdownNow(); + consumerThreadpool.shutdownNow(); + if (failed) { + try { + Files.delete(new File(out).toPath()); + } catch (IOException e) { + //ignore + } + } + } + } + + private void addProducers(Map m) { + for (Map.Entry entry : m.entrySet()) { + Slice slice = entry.getValue(); + Replica replica = slice.getLeader(); + if (replica == null) replica = slice.getReplicas().iterator().next();// get a random replica + CoreHandler coreHandler = new CoreHandler(replica); + corehandlers.put(replica.getCoreName(), coreHandler); + } + } + + private void addConsumer(CountDownLatch consumerlatch) { + consumerThreadpool.submit(() -> { + while (true) { + SolrDocument doc = null; + try { + doc = queue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + if (output != null) output.println("Consumer interrupted"); + failed = true; + break; + } + if (doc == EOFDOC) break; + try { + if (docsWritten.get() > limit) continue; + sink.accept(doc); + docsWritten.incrementAndGet(); + } catch (Exception e) { + if (output != null) output.println("Failed to write to file " + e.getMessage()); + failed = true; + } + } + consumerlatch.countDown(); + }); + } + + + class CoreHandler { + final Replica replica; + long expectedDocs; + AtomicLong receivedDocs = new AtomicLong(); + + CoreHandler(Replica replica) { + this.replica = replica; + } + + boolean exportDocsFromCore() + throws IOException, SolrServerException { + HttpSolrClient client = new HttpSolrClient.Builder(baseurl).build(); + try { + expectedDocs = getDocCount(replica.getCoreName(), client); + GenericSolrRequest request; + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add(Q, query); + if (fields != null) params.add(FL, fields); + params.add(SORT, uniqueKey + " asc"); + params.add(CommonParams.DISTRIB, "false"); + params.add(CommonParams.ROWS, "1000"); + String cursorMark = CursorMarkParams.CURSOR_MARK_START; + Consumer wrapper = doc -> { + try { + queue.offer(doc, 10, TimeUnit.SECONDS); + receivedDocs.incrementAndGet(); + } catch (InterruptedException e) { + failed = true; + if (output != null) output.println("Failed to write docs from" + e.getMessage()); + } + }; + StreamingBinaryResponseParser responseParser = new StreamingBinaryResponseParser(getStreamer(wrapper)); + while (true) { + if (failed) return false; + if (docsWritten.get() > limit) return true; + params.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); + request = new GenericSolrRequest(SolrRequest.METHOD.GET, + "/" + replica.getCoreName() + "/select", params); + request.setResponseParser(responseParser); + try { + NamedList rsp = client.request(request); + String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT); + if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) { + if (output != null) + output.println(StrUtils.formatString("\nExport complete for : {0}, docs : {1}", replica.getCoreName(), receivedDocs.get())); + if (expectedDocs != receivedDocs.get()) { + if (output != null) { + output.println(StrUtils.formatString("Could not download all docs for core {0} , expected: {1} , actual", + replica.getCoreName(), expectedDocs, receivedDocs)); + return false; + } + } + return true; + } + cursorMark = nextCursorMark; + if (output != null) output.print("."); + } catch (SolrServerException e) { + if(output != null) output.println("Error reading from server "+ replica.getBaseUrl()+"/"+ replica.getCoreName()); + failed = true; + return false; + } + } + } finally { + client.close(); + } + } + } + } + + + static long getDocCount(String coreName, HttpSolrClient client) throws SolrServerException, IOException { + SolrQuery q = new SolrQuery("*:*"); + q.setRows(0); + q.add("distrib", "false"); + GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, + "/" + coreName + "/select", q); + NamedList res = client.request(request); + SolrDocumentList sdl = (SolrDocumentList) res.get("response"); + return sdl.getNumFound(); + } } diff --git a/solr/core/src/test/org/apache/solr/util/TestExportTool.java b/solr/core/src/test/org/apache/solr/util/TestExportTool.java index 1fe5f66b896..fdfb3c09b2e 100644 --- a/solr/core/src/test/org/apache/solr/util/TestExportTool.java +++ b/solr/core/src/test/org/apache/solr/util/TestExportTool.java @@ -19,14 +19,18 @@ package org.apache.solr.util; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import org.apache.lucene.util.TestUtil; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; @@ -34,6 +38,9 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.util.FastInputStream; import org.apache.solr.common.util.JsonRecordReader; @@ -71,82 +78,155 @@ public class TestExportTool extends SolrCloudTestCase { String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME; - ExportTool.Info info = new ExportTool.Info(url); - + ExportTool.Info info = new ExportTool.MultiThreadedRunner(url); String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json"; info.setOutFormat(absolutePath, "jsonl"); info.setLimit("200"); - info.exportDocsWithCursorMark(); + info.fields = "id,desc_s"; + info.exportDocs(); - assertTrue(info.docsWritten >= 200); - JsonRecordReader jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**")); - Reader rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8); - try { - int[] count = new int[]{0}; - jsonReader.streamRecords(rdr, (record, path) -> count[0]++); - assertTrue(count[0] >= 200); - } finally { - rdr.close(); - } + assertJsonDocsCount(info, 200); - - info = new ExportTool.Info(url); + info = new ExportTool.MultiThreadedRunner(url); absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json"; info.setOutFormat(absolutePath, "jsonl"); info.setLimit("-1"); - info.exportDocsWithCursorMark(); + info.fields = "id,desc_s"; + info.exportDocs(); - assertTrue(info.docsWritten >= 1000); - jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**")); - rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8); - try { - int[] count = new int[]{0}; - jsonReader.streamRecords(rdr, (record, path) -> count[0]++); - assertTrue(count[0] >= 1000); - } finally { - rdr.close(); - } + assertJsonDocsCount(info, 1000); - - info = new ExportTool.Info(url); + info = new ExportTool.MultiThreadedRunner(url); absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin"; info.setOutFormat(absolutePath, "javabin"); info.setLimit("200"); - info.exportDocsWithCursorMark(); - assertTrue(info.docsWritten >= 200); + info.fields = "id,desc_s"; + info.exportDocs(); - FileInputStream fis = new FileInputStream(absolutePath); - try { - int[] count = new int[]{0}; - FastInputStream in = FastInputStream.wrap(fis); - new JavaBinUpdateRequestCodec() - .unmarshal(in, (document, req, commitWithin, override) -> count[0]++); - assertTrue(count[0] >= 200); - } finally { - fis.close(); - } + assertJavabinDocsCount(info, 200); - info = new ExportTool.Info(url); + info = new ExportTool.MultiThreadedRunner(url); absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin"; info.setOutFormat(absolutePath, "javabin"); info.setLimit("-1"); - info.exportDocsWithCursorMark(); - assertTrue(info.docsWritten >= 1000); - - fis = new FileInputStream(absolutePath); - try { - int[] count = new int[]{0}; - FastInputStream in = FastInputStream.wrap(fis); - new JavaBinUpdateRequestCodec() - .unmarshal(in, (document, req, commitWithin, override) -> count[0]++); - assertTrue(count[0] >= 1000); - } finally { - fis.close(); - } + info.fields = "id,desc_s"; + info.exportDocs(); + assertJavabinDocsCount(info, 1000); } finally { cluster.shutdown(); } } + + @Nightly + public void testVeryLargeCluster() throws Exception { + String COLLECTION_NAME = "veryLargeColl"; + MiniSolrCloudCluster cluster = configureCluster(4) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + + try { + CollectionAdminRequest + .createCollection(COLLECTION_NAME, "conf", 8, 1) + .setMaxShardsPerNode(10) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8); + + String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() + + File.separator).getPath(); + String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME; + + + int docCount = 0; + + for (int j = 0; j < 4; j++) { + int bsz = 10000; + UpdateRequest ur = new UpdateRequest(); + ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true); + for (int i = 0; i < bsz; i++) { + ur.add("id", String.valueOf((j * bsz) + i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50)); + } + cluster.getSolrClient().request(ur, COLLECTION_NAME); + docCount += bsz; + } + + QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0)); + assertEquals(docCount, qr.getResults().getNumFound()); + + DocCollection coll = cluster.getSolrClient().getClusterStateProvider().getCollection(COLLECTION_NAME); + HashMap docCounts = new HashMap<>(); + long totalDocsFromCores = 0; + for (Slice slice : coll.getSlices()) { + Replica replica = slice.getLeader(); + try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getBaseUrl()).build()) { + long count = ExportTool.getDocCount(replica.getCoreName(), client); + docCounts.put(replica.getCoreName(), count); + totalDocsFromCores += count; + } + } + assertEquals(docCount, totalDocsFromCores); + + ExportTool.MultiThreadedRunner info = null; + String absolutePath = null; + + info = new ExportTool.MultiThreadedRunner(url); + info.output = System.out; + absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin"; + info.setOutFormat(absolutePath, "javabin"); + info.setLimit("-1"); + info.exportDocs(); + assertJavabinDocsCount(info, docCount); + for (Map.Entry e : docCounts.entrySet()) { + assertEquals(e.getValue().longValue(), info.corehandlers.get(e.getKey()).receivedDocs.get()); + } + info = new ExportTool.MultiThreadedRunner(url); + info.output = System.out; + absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json"; + info.setOutFormat(absolutePath, "jsonl"); + info.fields = "id,desc_s"; + info.setLimit("-1"); + info.exportDocs(); + long actual = ((ExportTool.JsonSink) info.sink).docs.get(); + assertTrue("docs written :" + actual + "docs produced : " + info.docsWritten.get(), actual >= docCount); + assertJsonDocsCount(info, docCount); + } finally { + cluster.shutdown(); + + } + } + + + private void assertJavabinDocsCount(ExportTool.Info info, int expected) throws IOException { + assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected); + FileInputStream fis = new FileInputStream(info.out); + try { + int[] count = new int[]{0}; + FastInputStream in = FastInputStream.wrap(fis); + new JavaBinUpdateRequestCodec() + .unmarshal(in, (document, req, commitWithin, override) -> { + assertEquals(2, document.size()); + count[0]++; + }); + assertTrue(count[0] >= expected); + } finally { + fis.close(); + } + } + + private void assertJsonDocsCount(ExportTool.Info info, int expected) throws IOException { + assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected); + + JsonRecordReader jsonReader; + Reader rdr; + jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**")); + rdr = new InputStreamReader(new FileInputStream(info.out), StandardCharsets.UTF_8); + try { + int[] count = new int[]{0}; + jsonReader.streamRecords(rdr, (record, path) -> count[0]++); + assertTrue(count[0] >= expected); + } finally { + rdr.close(); + } + } }