diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 59dde90338a..874ac81db88 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -206,6 +206,8 @@ New Features * SOLR-9891: Add mkroot command to bin/solr and bin/solr.cmd (Erick Erickson) +* SOLR-9668,SOLR-7197: introduce cursorMark='true' in SolrEntityProcessor (Yegor Kozlov, Raveendra Yerraguntl via Mikhail Khludnev) + Optimizations ---------------------- * SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java index 5e62731879e..6d8726f91dc 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java @@ -16,6 +16,18 @@ */ package org.apache.solr.handler.dataimport; +import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE; +import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; @@ -27,22 +39,12 @@ import org.apache.solr.client.solrj.impl.XMLResponseParser; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CursorMarkParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE; -import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow; - /** *

* An implementation of {@link EntityProcessor} which fetches values from a @@ -139,81 +141,53 @@ public class SolrEntityProcessor extends EntityProcessorBase { * The following method changes the rowIterator mutable field. It requires * external synchronization. */ - private void buildIterator() { + protected void buildIterator() { if (rowIterator != null) { SolrDocumentListIterator documentListIterator = (SolrDocumentListIterator) rowIterator; if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) { - SolrDocumentList solrDocumentList = doQuery(documentListIterator - .getStart() + documentListIterator.getSize()); - if (solrDocumentList != null) { - rowIterator = new SolrDocumentListIterator(solrDocumentList); - } + nextPage(); } - } else { - SolrDocumentList solrDocumentList = doQuery(0); - if (solrDocumentList != null) { - rowIterator = new SolrDocumentListIterator(solrDocumentList); - } - return; + } else { + Boolean cursor = new Boolean(context + .getResolvedEntityAttribute(CursorMarkParams.CURSOR_MARK_PARAM)); + rowIterator = !cursor ? new SolrDocumentListIterator(new SolrDocumentList()) + : new SolrDocumentListCursor(new SolrDocumentList(), CursorMarkParams.CURSOR_MARK_START); + nextPage(); } } - protected SolrDocumentList doQuery(int start) { - this.queryString = context.getResolvedEntityAttribute(QUERY); - if (this.queryString == null) { - throw new DataImportHandlerException( - DataImportHandlerException.SEVERE, - "SolrEntityProcessor: parameter 'query' is required" - ); - } + protected void nextPage() { + ((SolrDocumentListIterator)rowIterator).doQuery(); + } - String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS); - if (rowsP != null) { - rows = Integer.parseInt(rowsP); - } - - String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ); - if (fqAsString != null) { - this.filterQueries = fqAsString.split(","); - } - - String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL); - if (fieldsAsString != null) { - this.fields = fieldsAsString.split(","); - } - this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT); - String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT); - if (timeoutAsString != null) { - this.timeout = Integer.parseInt(timeoutAsString); - } - - SolrQuery solrQuery = new SolrQuery(queryString); - solrQuery.setRows(rows); - solrQuery.setStart(start); - if (fields != null) { - for (String field : fields) { - solrQuery.addField(field); - } - } - solrQuery.setRequestHandler(requestHandler); - solrQuery.setFilterQueries(filterQueries); - solrQuery.setTimeAllowed(timeout * 1000); + class SolrDocumentListCursor extends SolrDocumentListIterator { - QueryResponse response = null; - try { - response = solrClient.query(solrQuery); - } catch (SolrServerException | IOException e) { - if (ABORT.equals(onError)) { - wrapAndThrow(SEVERE, e); - } else if (SKIP.equals(onError)) { - wrapAndThrow(DataImportHandlerException.SKIP_ROW, e); + private final String cursorMark; + + public SolrDocumentListCursor(SolrDocumentList solrDocumentList, String cursorMark) { + super(solrDocumentList); + this.cursorMark = cursorMark; + } + + @Override + protected void passNextPage(SolrQuery solrQuery) { + String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT); + if (timeoutAsString != null) { + throw new DataImportHandlerException(SEVERE,"cursorMark can't be used with timeout"); } + + solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); } - return response == null ? null : response.getResults(); + @Override + protected Iterator> createNextPageIterator(QueryResponse response) { + return + new SolrDocumentListCursor(response.getResults(), + response.getNextCursorMark()) ; + } } - private static class SolrDocumentListIterator implements Iterator> { + class SolrDocumentListIterator implements Iterator> { private final int start; private final int size; @@ -230,6 +204,84 @@ public class SolrEntityProcessor extends EntityProcessorBase { this.size = solrDocumentList.size(); } + protected QueryResponse doQuery() { + SolrEntityProcessor.this.queryString = context.getResolvedEntityAttribute(QUERY); + if (SolrEntityProcessor.this.queryString == null) { + throw new DataImportHandlerException( + DataImportHandlerException.SEVERE, + "SolrEntityProcessor: parameter 'query' is required" + ); + } + + String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS); + if (rowsP != null) { + rows = Integer.parseInt(rowsP); + } + + String sortParam = context.getResolvedEntityAttribute(CommonParams.SORT); + + String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ); + if (fqAsString != null) { + SolrEntityProcessor.this.filterQueries = fqAsString.split(","); + } + + String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL); + if (fieldsAsString != null) { + SolrEntityProcessor.this.fields = fieldsAsString.split(","); + } + SolrEntityProcessor.this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT); + + + SolrQuery solrQuery = new SolrQuery(queryString); + solrQuery.setRows(rows); + + if (sortParam!=null) { + solrQuery.setParam(CommonParams.SORT, sortParam); + } + + passNextPage(solrQuery); + + if (fields != null) { + for (String field : fields) { + solrQuery.addField(field); + } + } + solrQuery.setRequestHandler(requestHandler); + solrQuery.setFilterQueries(filterQueries); + + + QueryResponse response = null; + try { + response = solrClient.query(solrQuery); + } catch (SolrServerException | IOException | SolrException e) { + if (ABORT.equals(onError)) { + wrapAndThrow(SEVERE, e); + } else if (SKIP.equals(onError)) { + wrapAndThrow(DataImportHandlerException.SKIP_ROW, e); + } + } + + if (response != null) { + SolrEntityProcessor.this.rowIterator = createNextPageIterator(response); + } + return response; + } + + protected Iterator> createNextPageIterator(QueryResponse response) { + return new SolrDocumentListIterator(response.getResults()); + } + + protected void passNextPage(SolrQuery solrQuery) { + String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT); + if (timeoutAsString != null) { + SolrEntityProcessor.this.timeout = Integer.parseInt(timeoutAsString); + } + + solrQuery.setTimeAllowed(timeout * 1000); + + solrQuery.setStart(getStart() + getSize()); + } + @Override public boolean hasNext() { return solrDocumentIterator.hasNext(); diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java index 4ebca306ed8..42e5f7d3e48 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java @@ -29,16 +29,28 @@ public class MockSolrEntityProcessor extends SolrEntityProcessor { private int queryCount = 0; private int rows; + + private int start = 0; public MockSolrEntityProcessor(List docsData, int rows) { this.docsData = docsData; this.rows = rows; } + //@Override + //protected SolrDocumentList doQuery(int start) { + // queryCount++; + // return getDocs(start, rows); + // } + @Override - protected SolrDocumentList doQuery(int start) { - queryCount++; - return getDocs(start, rows); + protected void buildIterator() { + if (rowIterator==null || (!rowIterator.hasNext() && ((SolrDocumentListIterator)rowIterator).hasMoreRows())){ + queryCount++; + SolrDocumentList docs = getDocs(start, rows); + rowIterator = new SolrDocumentListIterator(docs); + start += docs.size(); + } } private SolrDocumentList getDocs(int start, int rows) { diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java index 8ef94c02c76..9e104eeb7a4 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java @@ -34,6 +34,8 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -179,7 +181,7 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe try { addDocumentsToSolr(generateSolrDocuments(7)); - runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'", false)); + runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'"+(random().nextBoolean() ?" cursorMark='true' sort='id asc'":""), false)); } catch (Exception e) { LOG.error(e.getMessage(), e); fail(e.getMessage()); @@ -252,7 +254,8 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe assertQ(req("*:*"), "//result[@numFound='0']"); try { - runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='abort'", false)); + runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='"+ + (random().nextBoolean() ? "abort" : "justtogetcoverage")+"'", false)); } catch (Exception e) { LOG.error(e.getMessage(), e); fail(e.getMessage()); @@ -260,7 +263,27 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe assertQ(req("*:*"), "//result[@numFound='0']"); } + + public void testCursorMarkNoSort() throws SolrServerException, IOException { + assertQ(req("*:*"), "//result[@numFound='0']"); + addDocumentsToSolr(generateSolrDocuments(7)); + try { + List errors = Arrays.asList("sort='id'", //wrong sort spec + "", //no sort spec + "sort='id asc' timeout='12345'"); // sort is fine, but set timeout + Collections.shuffle(errors, random()); + String attrs = "query='*:*' rows='2' fl='id,desc' cursorMark='true' " + + errors.get(0); + runFullImport(generateDIHConfig(attrs, + false)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + fail(e.getMessage()); + } + assertQ(req("*:*"), "//result[@numFound='0']"); + } + private static List> generateSolrDocuments(int num) { List> docList = new ArrayList<>(); for (int i = 1; i <= num; i++) { diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java index a8fcbb18cb5..a2a9fffa9ab 100644 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java @@ -18,11 +18,23 @@ package org.apache.solr.handler.dataimport; import java.util.*; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.handler.dataimport.SolrEntityProcessor.SolrDocumentListIterator; +import org.junit.Test; + /** * Unit test of SolrEntityProcessor. A very basic test outside of the DIH. */ public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCase { + private static final class NoNextMockProcessor extends SolrEntityProcessor { + @Override + protected void nextPage() { + } + } + private static final String ID = "id"; public void testQuery() { @@ -85,6 +97,64 @@ public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCa processor.destroy(); } } + @Test (expected = DataImportHandlerException.class) + public void testNoQuery() { + SolrEntityProcessor processor = new SolrEntityProcessor(); + + HashMap entityAttrs = new HashMap(){{put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");}}; + processor.init(getContext(null, null, null, null, Collections.emptyList(), + entityAttrs)); + try { + processor.buildIterator(); + }finally { + processor.destroy(); + } + } + + public void testPagingQuery() { + SolrEntityProcessor processor = new NoNextMockProcessor() ; + + HashMap entityAttrs = new HashMap(){{ + put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no"); + if (random().nextBoolean()) { + List noCursor = Arrays.asList("","false",CursorMarkParams.CURSOR_MARK_START);//only 'true' not '*' + Collections.shuffle(noCursor, random()); + put(CursorMarkParams.CURSOR_MARK_PARAM, noCursor.get(0)); + }}}; + processor.init(getContext(null, null, null, null, Collections.emptyList(), + entityAttrs)); + try { + processor.buildIterator(); + SolrQuery query = new SolrQuery(); + ((SolrDocumentListIterator) processor.rowIterator).passNextPage(query); + assertEquals("0", query.get(CommonParams.START)); + assertNull( query.get(CursorMarkParams.CURSOR_MARK_PARAM)); + assertNotNull( query.get(CommonParams.TIME_ALLOWED)); + }finally { + processor.destroy(); + } + } + + public void testCursorQuery() { + SolrEntityProcessor processor = new NoNextMockProcessor() ; + + HashMap entityAttrs = new HashMap(){{ + put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no"); + put(CursorMarkParams.CURSOR_MARK_PARAM,"true"); + }}; + processor.init(getContext(null, null, null, null, Collections.emptyList(), + entityAttrs)); + try { + processor.buildIterator(); + SolrQuery query = new SolrQuery(); + ((SolrDocumentListIterator) processor.rowIterator).passNextPage(query); + assertNull(query.get(CommonParams.START)); + assertEquals(CursorMarkParams.CURSOR_MARK_START, query.get(CursorMarkParams.CURSOR_MARK_PARAM)); + assertNull( query.get(CommonParams.TIME_ALLOWED)); + }finally { + processor.destroy(); + } + } private List generateUniqueDocs(int numDocs) { List types = new ArrayList<>();