SOLR-9668: introduce cursorMark='true' for SolrEntityProcessor

This commit is contained in:
Mikhail Khludnev 2016-12-27 15:34:12 +03:00
parent 26ee8e9bea
commit cc862d8e67
5 changed files with 237 additions and 78 deletions

View File

@ -206,6 +206,8 @@ New Features
* SOLR-9891: Add mkroot command to bin/solr and bin/solr.cmd (Erick Erickson)
* SOLR-9668,SOLR-7197: introduce cursorMark='true' in SolrEntityProcessor (Yegor Kozlov, Raveendra Yerraguntl via Mikhail Khludnev)
Optimizations
----------------------
* SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have

View File

@ -16,6 +16,18 @@
*/
package org.apache.solr.handler.dataimport;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
@ -27,22 +39,12 @@ import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CursorMarkParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
/**
* <p>
* An implementation of {@link EntityProcessor} which fetches values from a
@ -139,81 +141,53 @@ public class SolrEntityProcessor extends EntityProcessorBase {
* The following method changes the rowIterator mutable field. It requires
* external synchronization.
*/
private void buildIterator() {
protected void buildIterator() {
if (rowIterator != null) {
SolrDocumentListIterator documentListIterator = (SolrDocumentListIterator) rowIterator;
if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) {
SolrDocumentList solrDocumentList = doQuery(documentListIterator
.getStart() + documentListIterator.getSize());
if (solrDocumentList != null) {
rowIterator = new SolrDocumentListIterator(solrDocumentList);
}
nextPage();
}
} else {
SolrDocumentList solrDocumentList = doQuery(0);
if (solrDocumentList != null) {
rowIterator = new SolrDocumentListIterator(solrDocumentList);
}
return;
} else {
Boolean cursor = new Boolean(context
.getResolvedEntityAttribute(CursorMarkParams.CURSOR_MARK_PARAM));
rowIterator = !cursor ? new SolrDocumentListIterator(new SolrDocumentList())
: new SolrDocumentListCursor(new SolrDocumentList(), CursorMarkParams.CURSOR_MARK_START);
nextPage();
}
}
protected SolrDocumentList doQuery(int start) {
this.queryString = context.getResolvedEntityAttribute(QUERY);
if (this.queryString == null) {
throw new DataImportHandlerException(
DataImportHandlerException.SEVERE,
"SolrEntityProcessor: parameter 'query' is required"
);
}
String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS);
if (rowsP != null) {
rows = Integer.parseInt(rowsP);
}
String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ);
if (fqAsString != null) {
this.filterQueries = fqAsString.split(",");
}
String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL);
if (fieldsAsString != null) {
this.fields = fieldsAsString.split(",");
}
this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT);
String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
if (timeoutAsString != null) {
this.timeout = Integer.parseInt(timeoutAsString);
}
SolrQuery solrQuery = new SolrQuery(queryString);
solrQuery.setRows(rows);
solrQuery.setStart(start);
if (fields != null) {
for (String field : fields) {
solrQuery.addField(field);
}
}
solrQuery.setRequestHandler(requestHandler);
solrQuery.setFilterQueries(filterQueries);
solrQuery.setTimeAllowed(timeout * 1000);
QueryResponse response = null;
try {
response = solrClient.query(solrQuery);
} catch (SolrServerException | IOException e) {
if (ABORT.equals(onError)) {
wrapAndThrow(SEVERE, e);
} else if (SKIP.equals(onError)) {
wrapAndThrow(DataImportHandlerException.SKIP_ROW, e);
}
}
return response == null ? null : response.getResults();
protected void nextPage() {
((SolrDocumentListIterator)rowIterator).doQuery();
}
private static class SolrDocumentListIterator implements Iterator<Map<String,Object>> {
class SolrDocumentListCursor extends SolrDocumentListIterator {
private final String cursorMark;
public SolrDocumentListCursor(SolrDocumentList solrDocumentList, String cursorMark) {
super(solrDocumentList);
this.cursorMark = cursorMark;
}
@Override
protected void passNextPage(SolrQuery solrQuery) {
String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
if (timeoutAsString != null) {
throw new DataImportHandlerException(SEVERE,"cursorMark can't be used with timeout");
}
solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
}
@Override
protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
return
new SolrDocumentListCursor(response.getResults(),
response.getNextCursorMark()) ;
}
}
class SolrDocumentListIterator implements Iterator<Map<String,Object>> {
private final int start;
private final int size;
@ -230,6 +204,84 @@ public class SolrEntityProcessor extends EntityProcessorBase {
this.size = solrDocumentList.size();
}
protected QueryResponse doQuery() {
SolrEntityProcessor.this.queryString = context.getResolvedEntityAttribute(QUERY);
if (SolrEntityProcessor.this.queryString == null) {
throw new DataImportHandlerException(
DataImportHandlerException.SEVERE,
"SolrEntityProcessor: parameter 'query' is required"
);
}
String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS);
if (rowsP != null) {
rows = Integer.parseInt(rowsP);
}
String sortParam = context.getResolvedEntityAttribute(CommonParams.SORT);
String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ);
if (fqAsString != null) {
SolrEntityProcessor.this.filterQueries = fqAsString.split(",");
}
String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL);
if (fieldsAsString != null) {
SolrEntityProcessor.this.fields = fieldsAsString.split(",");
}
SolrEntityProcessor.this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT);
SolrQuery solrQuery = new SolrQuery(queryString);
solrQuery.setRows(rows);
if (sortParam!=null) {
solrQuery.setParam(CommonParams.SORT, sortParam);
}
passNextPage(solrQuery);
if (fields != null) {
for (String field : fields) {
solrQuery.addField(field);
}
}
solrQuery.setRequestHandler(requestHandler);
solrQuery.setFilterQueries(filterQueries);
QueryResponse response = null;
try {
response = solrClient.query(solrQuery);
} catch (SolrServerException | IOException | SolrException e) {
if (ABORT.equals(onError)) {
wrapAndThrow(SEVERE, e);
} else if (SKIP.equals(onError)) {
wrapAndThrow(DataImportHandlerException.SKIP_ROW, e);
}
}
if (response != null) {
SolrEntityProcessor.this.rowIterator = createNextPageIterator(response);
}
return response;
}
protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
return new SolrDocumentListIterator(response.getResults());
}
protected void passNextPage(SolrQuery solrQuery) {
String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
if (timeoutAsString != null) {
SolrEntityProcessor.this.timeout = Integer.parseInt(timeoutAsString);
}
solrQuery.setTimeAllowed(timeout * 1000);
solrQuery.setStart(getStart() + getSize());
}
@Override
public boolean hasNext() {
return solrDocumentIterator.hasNext();

View File

@ -30,15 +30,27 @@ public class MockSolrEntityProcessor extends SolrEntityProcessor {
private int rows;
private int start = 0;
public MockSolrEntityProcessor(List<SolrTestCaseJ4.Doc> docsData, int rows) {
this.docsData = docsData;
this.rows = rows;
}
//@Override
//protected SolrDocumentList doQuery(int start) {
// queryCount++;
// return getDocs(start, rows);
// }
@Override
protected SolrDocumentList doQuery(int start) {
queryCount++;
return getDocs(start, rows);
protected void buildIterator() {
if (rowIterator==null || (!rowIterator.hasNext() && ((SolrDocumentListIterator)rowIterator).hasMoreRows())){
queryCount++;
SolrDocumentList docs = getDocs(start, rows);
rowIterator = new SolrDocumentListIterator(docs);
start += docs.size();
}
}
private SolrDocumentList getDocs(int start, int rows) {

View File

@ -34,6 +34,8 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -179,7 +181,7 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
try {
addDocumentsToSolr(generateSolrDocuments(7));
runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'", false));
runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'"+(random().nextBoolean() ?" cursorMark='true' sort='id asc'":""), false));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
fail(e.getMessage());
@ -252,7 +254,28 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
assertQ(req("*:*"), "//result[@numFound='0']");
try {
runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='abort'", false));
runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='"+
(random().nextBoolean() ? "abort" : "justtogetcoverage")+"'", false));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
fail(e.getMessage());
}
assertQ(req("*:*"), "//result[@numFound='0']");
}
public void testCursorMarkNoSort() throws SolrServerException, IOException {
assertQ(req("*:*"), "//result[@numFound='0']");
addDocumentsToSolr(generateSolrDocuments(7));
try {
List<String> errors = Arrays.asList("sort='id'", //wrong sort spec
"", //no sort spec
"sort='id asc' timeout='12345'"); // sort is fine, but set timeout
Collections.shuffle(errors, random());
String attrs = "query='*:*' rows='2' fl='id,desc' cursorMark='true' "
+ errors.get(0);
runFullImport(generateDIHConfig(attrs,
false));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
fail(e.getMessage());

View File

@ -18,11 +18,23 @@ package org.apache.solr.handler.dataimport;
import java.util.*;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CursorMarkParams;
import org.apache.solr.handler.dataimport.SolrEntityProcessor.SolrDocumentListIterator;
import org.junit.Test;
/**
* Unit test of SolrEntityProcessor. A very basic test outside of the DIH.
*/
public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCase {
private static final class NoNextMockProcessor extends SolrEntityProcessor {
@Override
protected void nextPage() {
}
}
private static final String ID = "id";
public void testQuery() {
@ -85,6 +97,64 @@ public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCa
processor.destroy();
}
}
@Test (expected = DataImportHandlerException.class)
public void testNoQuery() {
SolrEntityProcessor processor = new SolrEntityProcessor();
HashMap<String,String> entityAttrs = new HashMap<String,String>(){{put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");}};
processor.init(getContext(null, null, null, null, Collections.emptyList(),
entityAttrs));
try {
processor.buildIterator();
}finally {
processor.destroy();
}
}
public void testPagingQuery() {
SolrEntityProcessor processor = new NoNextMockProcessor() ;
HashMap<String,String> entityAttrs = new HashMap<String,String>(){{
put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");
if (random().nextBoolean()) {
List<String> noCursor = Arrays.asList("","false",CursorMarkParams.CURSOR_MARK_START);//only 'true' not '*'
Collections.shuffle(noCursor, random());
put(CursorMarkParams.CURSOR_MARK_PARAM, noCursor.get(0));
}}};
processor.init(getContext(null, null, null, null, Collections.emptyList(),
entityAttrs));
try {
processor.buildIterator();
SolrQuery query = new SolrQuery();
((SolrDocumentListIterator) processor.rowIterator).passNextPage(query);
assertEquals("0", query.get(CommonParams.START));
assertNull( query.get(CursorMarkParams.CURSOR_MARK_PARAM));
assertNotNull( query.get(CommonParams.TIME_ALLOWED));
}finally {
processor.destroy();
}
}
public void testCursorQuery() {
SolrEntityProcessor processor = new NoNextMockProcessor() ;
HashMap<String,String> entityAttrs = new HashMap<String,String>(){{
put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");
put(CursorMarkParams.CURSOR_MARK_PARAM,"true");
}};
processor.init(getContext(null, null, null, null, Collections.emptyList(),
entityAttrs));
try {
processor.buildIterator();
SolrQuery query = new SolrQuery();
((SolrDocumentListIterator) processor.rowIterator).passNextPage(query);
assertNull(query.get(CommonParams.START));
assertEquals(CursorMarkParams.CURSOR_MARK_START, query.get(CursorMarkParams.CURSOR_MARK_PARAM));
assertNull( query.get(CommonParams.TIME_ALLOWED));
}finally {
processor.destroy();
}
}
private List<Doc> generateUniqueDocs(int numDocs) {
List<FldType> types = new ArrayList<>();