NIFI-4516 Added QuerySolr after rebase

This closes #2517

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
JohannesDaniel 2018-04-12 15:00:47 +02:00 committed by Mike Thomsen
parent a0c9bebe24
commit aa196bc01f
10 changed files with 2265 additions and 72 deletions

View File

@ -63,6 +63,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -74,6 +75,11 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
@ -103,19 +109,19 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- Need to declare the newer versions of these b/c NiFi uses Lucene 4.10.3 -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<!-- Need to declare the newer versions of these b/c NiFi uses Lucene 4.10.3 -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
@ -134,12 +140,6 @@
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xmlunit</groupId>
<artifactId>xmlunit-matchers</artifactId>

View File

@ -46,7 +46,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -85,6 +84,7 @@ import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -106,15 +106,6 @@ public class GetSolr extends SolrProcessor {
.defaultValue(MODE_XML.getValue())
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor
.Builder().name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor
.Builder().name("Solr Query")
.displayName("Solr Query")
@ -376,7 +367,8 @@ public class GetSolr extends SolrProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml");
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions()
.asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(null, null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();

View File

@ -47,14 +47,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -124,7 +120,6 @@ public class PutSolrContentStream extends SolrProcessor {
public static final String COLLECTION_PARAM_NAME = "collection";
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@ -194,7 +189,7 @@ public class PutSolrContentStream extends SolrProcessor {
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue();
final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile));
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@ -292,36 +287,4 @@ public class PutSolrContentStream extends SolrProcessor {
}
return foundIOException;
}
// get all of the dynamic properties and values into a Map for later adding to the Solr request
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
final Map<String,String[]> paramsMap = new HashMap<>();
final SortedMap<String,String> repeatingParams = new TreeMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
final String paramName = descriptor.getName();
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (!paramValue.trim().isEmpty()) {
if (paramName.matches(REPEATING_PARAM_PATTERN)) {
repeatingParams.put(paramName, paramValue);
} else {
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
}
}
}
}
for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) {
final String paramName = entry.getKey();
final String paramValue = entry.getValue();
final int idx = paramName.lastIndexOf(".");
MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
}
return paramsMap;
}
}

View File

@ -0,0 +1,615 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import com.google.gson.stream.JsonWriter;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.FacetField;
import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.IntervalFacet;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RangeFacet;
import org.apache.solr.client.solrj.response.RangeFacet.Count;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.params.StatsParams;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
@Tags({"Apache", "Solr", "Get", "Query", "Records"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer")
@WritesAttributes({
@WritesAttribute(attribute = "solr.connect", description = "Solr connect string"),
@WritesAttribute(attribute = "solr.collection", description = "Solr collection"),
@WritesAttribute(attribute = "solr.query", description = "Query string sent to Solr"),
@WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor mark can be used for scrolling Solr"),
@WritesAttribute(attribute = "solr.status.code", description = "Status code of Solr request. A status code of 0 indicates that the request was successfully processed"),
@WritesAttribute(attribute = "solr.query.time", description = "The elapsed time to process the query (in ms)"),
@WritesAttribute(attribute = "solr.start", description = "Solr start parameter (result offset) for the query"),
@WritesAttribute(attribute = "solr.rows", description = "Number of Solr documents to be returned for the query"),
@WritesAttribute(attribute = "solr.number.results", description = "Number of Solr documents that match the query"),
@WritesAttribute(attribute = "mime.type", description = "The mime type of the data format"),
@WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"),
@WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails")
})
public class QuerySolr extends SolrProcessor {
public static final AllowableValue MODE_XML = new AllowableValue("XML");
public static final AllowableValue MODE_REC = new AllowableValue("Records");
public static final AllowableValue RETURN_TOP_RESULTS = new AllowableValue("return_only_top_results", "Only top results");
public static final AllowableValue RETURN_ALL_RESULTS = new AllowableValue("return_all_results", "Entire results");
public static final String MIME_TYPE_JSON = "application/json";
public static final String MIME_TYPE_XML = "application/xml";
public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect";
public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection";
public static final String ATTRIBUTE_SOLR_QUERY = "solr.query";
public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark";
public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code";
public static final String ATTRIBUTE_SOLR_START = "solr.start";
public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows";
public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = "solr.number.results";
public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time";
public static final String EXCEPTION = "querysolr.exeption";
public static final String EXCEPTION_MESSAGE = "querysolr.exeption.message";
public static final Integer UPPER_LIMIT_START_PARAM = 10000;
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor
.Builder().name("return_type")
.displayName("Return Type")
.description("Output format of Solr results. Write Solr documents to FlowFiles as XML or using a Record Writer")
.required(true)
.allowableValues(MODE_XML, MODE_REC)
.defaultValue(MODE_XML.getValue())
.build();
public static final PropertyDescriptor SOLR_PARAM_QUERY = new PropertyDescriptor
.Builder().name("solr_param_query")
.displayName("Solr Query")
.description("Solr Query, e. g. field:value")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("*:*")
.build();
public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new PropertyDescriptor
.Builder().name("solr_param_request_handler")
.displayName("Request Handler")
.description("Define a request handler here, e. g. /query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("/select")
.build();
public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new PropertyDescriptor
.Builder().name("solr_param_field_list")
.displayName("Field List")
.description("Comma separated list of fields to be included into results, e. g. field1,field2")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_SORT = new PropertyDescriptor
.Builder().name("solr_param_sort")
.displayName("Sorting of result list")
.description("Comma separated sort clauses to define the sorting of results, e. g. field1 asc, field2 desc")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_START = new PropertyDescriptor
.Builder().name("solr_param_start")
.displayName("Start of results")
.description("Offset of result set")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SOLR_PARAM_ROWS = new PropertyDescriptor
.Builder().name("solr_param_rows")
.displayName("Rows")
.description("Number of results to be returned for a single request")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new PropertyDescriptor
.Builder().name("amount_documents_to_return")
.displayName("Total amount of returned results")
.description("Total amount of Solr documents to be returned. If this property is set to \"Only top results\", " +
"only single requests will be sent to Solr and the results will be written into single FlowFiles. If it is set to " +
"\"Entire results\", all results matching to the query are retrieved via multiple Solr requests and " +
"returned in multiple FlowFiles. For both options, the number of Solr documents to be returned in a FlowFile depends on " +
"the configuration of the \"Rows\" property")
.required(true)
.allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS)
.defaultValue(RETURN_TOP_RESULTS.getValue())
.build();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' Solr parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
public static final Relationship RESULTS = new Relationship.Builder().name("results")
.description("Results of Solr queries").build();
public static final Relationship FACETS = new Relationship.Builder().name("facets")
.description("Results of faceted search").build();
public static final Relationship STATS = new Relationship.Builder().name("stats")
.description("Stats about Solr index").build();
public static final Relationship ORIGINAL = new Relationship.Builder().name("original")
.description("Original flowfile").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
.description("Failure relationship").build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(COLLECTION);
descriptors.add(RETURN_TYPE);
descriptors.add(RECORD_WRITER);
descriptors.add(SOLR_PARAM_QUERY);
descriptors.add(SOLR_PARAM_REQUEST_HANDLER);
descriptors.add(SOLR_PARAM_FIELD_LIST);
descriptors.add(SOLR_PARAM_SORT);
descriptors.add(SOLR_PARAM_START);
descriptors.add(SOLR_PARAM_ROWS);
descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN);
descriptors.add(JAAS_CLIENT_APP_NAME);
descriptors.add(BASIC_USERNAME);
descriptors.add(BASIC_PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(SOLR_SOCKET_TIMEOUT);
descriptors.add(SOLR_CONNECTION_TIMEOUT);
descriptors.add(SOLR_MAX_CONNECTIONS);
descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
descriptors.add(ZK_CLIENT_TIMEOUT);
descriptors.add(ZK_CONNECTION_TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(FAILURE);
relationships.add(RESULTS);
relationships.add(FACETS);
relationships.add(STATS);
relationships.add(ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
}
public static final Set<String> SUPPORTED_SEARCH_COMPONENTS = new HashSet<>();
static {
SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, FacetParams.FACET));
}
public static final Set<String> SEARCH_COMPONENTS_ON = new HashSet<>();
static {
SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes"));
}
@Override
protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
final Collection<ValidationResult> problems = new ArrayList<>();
if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
&& !context.getProperty(RECORD_WRITER).isSet()) {
problems.add(new ValidationResult.Builder()
.explanation("for writing records a record writer has to be configured")
.valid(false)
.subject("Record writer check")
.build());
}
return problems;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFileOriginal = session.get();
FlowFile flowFileResponse;
if (flowFileOriginal == null) {
if (context.hasNonLoopConnection()) {
return;
}
flowFileResponse = session.create();
} else {
flowFileResponse = session.create(flowFileOriginal);
}
final SolrQuery solrQuery = new SolrQuery();
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue();
final StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(getSolrLocation());
if (isSolrCloud) {
transitUri.append(":").append(collection);
}
final StopWatch timer = new StopWatch(false);
try {
solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue());
solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue());
if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) {
for (final String field : context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue()
.split(",")) {
solrQuery.addField(field.trim());
}
}
// Avoid ArrayIndexOutOfBoundsException due to incorrectly configured sorting
try {
if (context.getProperty(SOLR_PARAM_SORT).isSet()) {
final List<SolrQuery.SortClause> sortings = new ArrayList<>();
for (final String sorting : context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue()
.split(",")) {
final String[] sortEntry = sorting.trim().split(" ");
sortings.add(new SolrQuery.SortClause(sortEntry[0], sortEntry[1]));
}
solrQuery.setSorts(sortings);
}
} catch (Exception e) {
throw new ProcessException("Error while parsing the sort clauses for the Solr query");
}
final Integer startParam = context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt(
context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.START_DEFAULT;
solrQuery.setStart(startParam);
final Integer rowParam = context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt(
context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.ROWS_DEFAULT;
solrQuery.setRows(rowParam);
final Map<String,String[]> additionalSolrParams = SolrUtils.getRequestParams(context, flowFileResponse);
final Set<String> searchComponents = extractSearchComponents(additionalSolrParams);
solrQuery.add(new MultiMapSolrParams(additionalSolrParams));
final Map<String,String> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation());
if (isSolrCloud) {
attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection);
}
attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString());
if (flowFileOriginal != null) {
flowFileOriginal = session.putAllAttributes(flowFileOriginal, attributes);
}
flowFileResponse = session.putAllAttributes(flowFileResponse, attributes);
final boolean getEntireResults = RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue());
boolean processFacetsAndStats = true;
boolean continuePaging = true;
while (continuePaging){
timer.start();
Map<String,String> responseAttributes = new HashMap<>();
responseAttributes.put(ATTRIBUTE_SOLR_START, solrQuery.getStart().toString());
responseAttributes.put(ATTRIBUTE_SOLR_ROWS, solrQuery.getRows().toString());
if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) {
logger.warn("The start parameter of Solr query {} exceeded the upper limit of {}. The query will not be processed " +
"to avoid performance or memory issues on the part of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM});
flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
timer.stop();
break;
}
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
final QueryResponse response = req.process(getSolrClient());
timer.stop();
final Long totalNumberOfResults = response.getResults().getNumFound();
responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, totalNumberOfResults.toString());
responseAttributes.put(ATTRIBUTE_CURSOR_MARK, response.getNextCursorMark());
responseAttributes.put(ATTRIBUTE_SOLR_STATUS, String.valueOf(response.getStatus()));
responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime()));
flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes);
if (response.getResults().size() > 0) {
if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
flowFileResponse = session.write(flowFileResponse, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response));
flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_XML);
} else {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse)
.asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null);
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema);
final StringBuffer mimeType = new StringBuffer();
flowFileResponse = session.write(flowFileResponse, out -> {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
writer.write(recordSet);
writer.flush();
mimeType.append(writer.getMimeType());
} catch (SchemaNotFoundException e) {
throw new ProcessException("Could not parse Solr response", e);
}
});
flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
}
if (processFacetsAndStats) {
if (searchComponents.contains(FacetParams.FACET)) {
FlowFile flowFileFacets = session.create(flowFileResponse);
flowFileFacets = session.write(flowFileFacets, out -> {
try (
final OutputStreamWriter osw = new OutputStreamWriter(out);
final JsonWriter writer = new JsonWriter(osw)
) {
addFacetsFromSolrResponseToJsonWriter(response, writer);
}
});
flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileFacets, FACETS);
}
if (searchComponents.contains(StatsParams.STATS)) {
FlowFile flowFileStats = session.create(flowFileResponse);
flowFileStats = session.write(flowFileStats, out -> {
try (
final OutputStreamWriter osw = new OutputStreamWriter(out);
final JsonWriter writer = new JsonWriter(osw)
) {
addStatsFromSolrResponseToJsonWriter(response, writer);
}
});
flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON);
session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileStats, STATS);
}
processFacetsAndStats = false;
}
}
if (getEntireResults) {
final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows();
if (totalDocumentsReturned < totalNumberOfResults) {
solrQuery.setStart(totalDocumentsReturned);
session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileResponse, RESULTS);
flowFileResponse = session.create(flowFileResponse);
} else {
continuePaging = false;
}
} else {
continuePaging = false;
}
}
} catch (Exception e) {
flowFileResponse = session.penalize(flowFileResponse);
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName());
flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage());
session.transfer(flowFileResponse, FAILURE);
logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e);
if (flowFileOriginal != null) {
flowFileOriginal = session.penalize(flowFileOriginal);
}
}
if (!flowFileResponse.isPenalized()) {
session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFileResponse, RESULTS);
}
if (flowFileOriginal != null) {
if (!flowFileOriginal.isPenalized()) {
session.transfer(flowFileOriginal, ORIGINAL);
} else {
session.remove(flowFileOriginal);
}
}
}
private Set<String> extractSearchComponents(Map<String,String[]> solrParams) {
final Set<String> searchComponentsTemp = new HashSet<>();
for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS)
if (solrParams.keySet().contains(searchComponent)) {
if (SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) {
searchComponentsTemp.add(searchComponent);
}
}
return Collections.unmodifiableSet(searchComponentsTemp);
}
private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
writer.beginObject();
writer.name("stats_fields");
writer.beginObject();
for (Map.Entry<String,FieldStatsInfo> entry: response.getFieldStatsInfo().entrySet()) {
FieldStatsInfo fsi = entry.getValue();
writer.name(entry.getKey());
writer.beginObject();
writer.name("min").value(fsi.getMin().toString());
writer.name("max").value(fsi.getMax().toString());
writer.name("count").value(fsi.getCount());
writer.name("missing").value(fsi.getMissing());
writer.name("sum").value(fsi.getSum().toString());
writer.name("mean").value(fsi.getMean().toString());
writer.name("sumOfSquares").value(fsi.getSumOfSquares());
writer.name("stddev").value(fsi.getStddev());
writer.endObject();
}
writer.endObject();
writer.endObject();
}
private static void addFacetsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException {
writer.beginObject();
writer.name("facet_queries");
writer.beginArray();
for (final Map.Entry<String,Integer> facetQuery : response.getFacetQuery().entrySet()){
writer.beginObject();
writer.name("facet").value(facetQuery.getKey());
writer.name("count").value(facetQuery.getValue());
writer.endObject();
}
writer.endArray();
writer.name("facet_fields");
writer.beginObject();
for (final FacetField facetField : response.getFacetFields()){
writer.name(facetField.getName());
writer.beginArray();
for (final FacetField.Count count : facetField.getValues()) {
writer.beginObject();
writer.name("facet").value(count.getName());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.name("facet_ranges");
writer.beginObject();
for (final RangeFacet rangeFacet : response.getFacetRanges()) {
writer.name(rangeFacet.getName());
writer.beginArray();
final List<Count> list = rangeFacet.getCounts();
for (final Count count : list) {
writer.beginObject();
writer.name("facet").value(count.getValue());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.name("facet_intervals");
writer.beginObject();
for (final IntervalFacet intervalFacet : response.getIntervalFacets()) {
writer.name(intervalFacet.getField());
writer.beginArray();
for (final IntervalFacet.Count count : intervalFacet.getIntervals()) {
writer.beginObject();
writer.name("facet").value(count.getKey());
writer.name("count").value(count.getCount());
writer.endObject();
}
writer.endArray();
}
writer.endObject();
writer.endObject();
}
}

View File

@ -28,8 +28,11 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@ -48,15 +51,19 @@ import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.MultiMapSolrParams;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
public class SolrUtils {
@ -67,6 +74,15 @@ public class SolrUtils {
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue(
"Standard", "Standard", "A stand-alone Solr instance.");
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor
.Builder().name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
@ -176,6 +192,8 @@ public class SolrUtils {
.defaultValue("10 seconds")
.build();
public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$";
public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@ -220,8 +238,6 @@ public class SolrUtils {
}
}
/**
* Writes each SolrDocument to a record.
*/
@ -245,7 +261,6 @@ public class SolrUtils {
return new ListRecordSet(schema, lr);
}
public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) {
return new QueryResponseOutputStreamCallback(response);
}
@ -281,5 +296,33 @@ public class SolrUtils {
}
}
public static Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
final Map<String,String[]> paramsMap = new HashMap<>();
final SortedMap<String,String> repeatingParams = new TreeMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
final String paramName = descriptor.getName();
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (!paramValue.trim().isEmpty()) {
if (paramName.matches(REPEATING_PARAM_PATTERN)) {
repeatingParams.put(paramName, paramValue);
} else {
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
}
}
}
}
for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) {
final String paramName = entry.getKey();
final String paramValue = entry.getValue();
final int idx = paramName.lastIndexOf(".");
MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
}
return paramsMap;
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.processors.solr.PutSolrContentStream
org.apache.nifi.processors.solr.GetSolr
org.apache.nifi.processors.solr.QuerySolr

View File

@ -0,0 +1,142 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>QuerySolr</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Example</h2>
<p>
This processor queries Solr and writes results to FlowFiles. The processor can be used at the
beginning of dataflows and later. Solr results can be written to FlowFiles as Solr XML or using
records functions (supporting CSV, JSON, etc.). Additionally, facets and stats can be retrieved.
They are written to FlowFiles in JSON and sent to designated relationships.
</p>
<p>
The processor can either be configured to retrieve only top results or full result sets. However,
it should be emphasized that this processor is not designed to export large result sets from Solr.
If the processor is configured to return full result sets, the configured number of rows per
request will be used as batch size and the processor will iteratively increase the start parameter
returning results in one FlowFile per request. The processor will stop iterating through results as
soon as the start parameter exceeds 10000. For exporting large result sets, it can be considered
to make use of the processor GetSolr. Principally, it is also possible to embed this processor into a
dataflow iterating through results making use of the attribute solr.cursor.mark that is added to FlowFiles
for each request. Notice that the usage of Solr's cursor mark requires queries to fulfil several preconditions
(see Solr documentation for deep paging for additional details).
</p>
<p>
The most common Solr parameters can be defined via processor properties. Other parameters have to be set via
dynamic properties.
</p>
<p>
Parameters that can be set multiple times also have to be defined as dynamic properties
(e. g. fq, facet.field, stats.field). If these parameters must be set multiple times with different values,
properties can follow a naming convention:
name.number, where name is the parameter name and number is a unique number.
Repeating parameters will be sorted by their property name.
</p>
<p>
Example: Defining the fq parameter multiple times
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>fq.1</td>
<td><code>field1:value1</code></td>
</tr>
<tr>
<td>fq.2</td>
<td><code>field2:value2</code></td>
</tr>
<tr>
<td>fq.3</td>
<td><code>field3:value3</code></td>
</tr>
</table>
<p>
This definition will be appended to the Solr URL as follows:
fq=field1:value1&fq=field2:value2&fq=field3:value3
</p>
<p>
Facets and stats can be activated setting the respective Solr parameters as dynamic properties. Example:
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>facet</td>
<td><code>true</code></td>
</tr>
<tr>
<td>facet.field</td>
<td><code>fieldname</code></td>
</tr>
<tr>
<td>stats</td>
<td><code>true</code></td>
</tr>
<tr>
<td>stats.field</td>
<td><code>fieldname</code></td>
</tr>
</table>
<p>
Multiple fields for facets or stats can be defined in the same way as it is described for multiple filter queries:
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>facet</td>
<td><code>true</code></td>
</tr>
<tr>
<td>facet.field.1</td>
<td><code>firstField</code></td>
</tr>
<tr>
<td>facet.field.2</td>
<td><code>secondField</code></td>
</tr>
</table>
<p>
This definition will be appended to the Solr URL as follows:
facet=true&facet.field=firstField&facet.field=secondField
</p>
</body>
</html>

View File

@ -0,0 +1,640 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import com.google.gson.stream.JsonReader;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.SolrInputDocument;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.xmlunit.matchers.CompareMatcher;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
public class QuerySolrIT {
/*
This integration test expects a Solr instance running locally in SolrCloud mode, coordinated by a single ZooKeeper
instance accessible with the ZooKeeper-Connect-String "localhost:2181".
*/
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
private static final SimpleDateFormat DATE_FORMAT_SOLR_COLLECTION = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.US);
private static String SOLR_COLLECTION;
private static String ZK_CONFIG_PATH;
private static String ZK_CONFIG_NAME;
private static String SOLR_LOCATION = "localhost:2181";
static {
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
Date date = new Date();
SOLR_COLLECTION = DATE_FORMAT_SOLR_COLLECTION.format(date) + "_QuerySolrIT";
ZK_CONFIG_PATH = "src/test/resources/solr/testCollection/conf";
ZK_CONFIG_NAME = "QuerySolrIT_config";
}
@BeforeClass
public static void setup() throws IOException, SolrServerException {
CloudSolrClient solrClient = createSolrClient();
Path currentDir = Paths.get(ZK_CONFIG_PATH);
solrClient.uploadConfig(currentDir, ZK_CONFIG_NAME);
solrClient.setDefaultCollection(SOLR_COLLECTION);
if (!solrClient.getZkStateReader().getClusterState().hasCollection(SOLR_COLLECTION)) {
CollectionAdminRequest.Create createCollection = CollectionAdminRequest.createCollection(SOLR_COLLECTION, ZK_CONFIG_NAME, 1, 1);
createCollection.process(solrClient);
} else {
solrClient.deleteByQuery("*:*");
}
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "doc" + i);
Date date = new Date();
doc.addField("created", DATE_FORMAT.format(date));
doc.addField("string_single", "single" + i + ".1");
doc.addField("string_multi", "multi" + i + ".1");
doc.addField("string_multi", "multi" + i + ".2");
doc.addField("integer_single", i);
doc.addField("integer_multi", 1);
doc.addField("integer_multi", 2);
doc.addField("integer_multi", 3);
doc.addField("double_single", 0.5 + i);
solrClient.add(doc);
}
solrClient.commit();
}
public static CloudSolrClient createSolrClient() {
CloudSolrClient solrClient = null;
try {
solrClient = new CloudSolrClient.Builder().withZkHost(SOLR_LOCATION).build();
solrClient.setDefaultCollection(SOLR_COLLECTION);
} catch (Exception e) {
e.printStackTrace();
}
return solrClient;
}
@AfterClass
public static void teardown() {
try {
CloudSolrClient solrClient = createSolrClient();
CollectionAdminRequest.Delete deleteCollection = CollectionAdminRequest.deleteCollection(SOLR_COLLECTION);
deleteCollection.process(solrClient);
solrClient.close();
} catch (Exception e) {
}
}
private TestRunner createRunnerWithSolrClient(SolrClient solrClient) {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:2181");
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
return runner;
}
@Test
public void testAllFacetCategories() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("facet.field", "integer_multi");
runner.setProperty("facet.interval", "integer_single");
runner.setProperty("facet.interval.set.1", "[4,7]");
runner.setProperty("facet.interval.set.2", "[5,7]");
runner.setProperty("facet.range", "created");
runner.setProperty("facet.range.start", "NOW/MINUTE");
runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE");
runner.setProperty("facet.range.gap", "+20SECOND");
runner.setProperty("facet.query.1", "*:*");
runner.setProperty("facet.query.2", "integer_multi:2");
runner.setProperty("facet.query.3", "integer_multi:3");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.FACETS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("facet_queries")) {
assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader));
} else if (name.equals("facet_fields")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_multi");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30);
reader.endObject();
} else if (name.equals("facet_ranges")) {
reader.beginObject();
assertEquals(reader.nextName(), "created");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10);
reader.endObject();
} else if (name.equals("facet_intervals")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7);
reader.endObject();
}
}
reader.endObject();
reader.close();
solrClient.close();
}
private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException {
int checkSum = 0;
reader.beginArray();
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("count")) {
checkSum += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.endArray();
return checkSum;
}
@Test
public void testFacetTrueButNull() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
// Check for empty nestet Objects in JSON
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("facet_queries")) {
reader.beginArray();
assertFalse(reader.hasNext());
reader.endArray();
} else {
reader.beginObject();
assertFalse(reader.hasNext());
reader.endObject();
}
}
reader.endObject();
JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader_stats.beginObject();
assertEquals(reader_stats.nextName(), "stats_fields");
reader_stats.beginObject();
assertFalse(reader_stats.hasNext());
reader_stats.endObject();
reader_stats.endObject();
reader.close();
reader_stats.close();
solrClient.close();
}
@Test
public void testStats() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("stats", "true");
runner.setProperty("stats.field", "integer_single");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.STATS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader.beginObject();
assertEquals(reader.nextName(), "stats_fields");
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
switch (name) {
case "min": assertEquals(reader.nextString(), "0.0"); break;
case "max": assertEquals(reader.nextString(), "9.0"); break;
case "count": assertEquals(reader.nextInt(), 10); break;
case "sum": assertEquals(reader.nextString(), "45.0"); break;
default: reader.skipValue(); break;
}
}
reader.endObject();
reader.endObject();
reader.endObject();
reader.close();
solrClient.close();
}
@Test
public void testRelationshipRoutings() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
// Set request handler for request failure
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler");
// Processor has no input connection and fails
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Processor has an input connection and fails
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Set request handler for successful request
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select");
// Processor has no input connection and succeeds
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
// Processor has an input connection and succeeds
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, true);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
solrClient.close();
}
@Test
public void testExpressionLanguageForProperties() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}");
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}");
runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}");
runner.enqueue(new byte[0], new HashMap<String,String>(){{
put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)");
put("handler", "/select");
put("fields", "id");
put("sort", "id desc");
put("start", "1");
put("rows", "2");
}});
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testSingleFilterQuery() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq", "id:(doc2 OR doc3)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testMultipleFilterQueries() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)");
runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)");
runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testStandardResponse() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc");
runner.setNonLoopConnection(false);
runner.run();
runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
solrClient.close();
}
@Test
public void testPreserveOriginalContent() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
String content = "test content 123";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0))));
solrClient.close();
}
@Test
public void testRetrievalOfFullResults() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 5);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.STATS, 0);
runner.assertTransferCount(QuerySolr.FACETS, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS);
Integer documentCounter = 0;
Integer startParam = 0;
for (MockFlowFile flowFile : flowFiles) {
Map<String,String> attributes = flowFile.getAttributes();
assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString());
startParam += 2;
StringBuffer expectedXml = new StringBuffer()
.append("<docs><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc></docs>");
assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
}
solrClient.close();
}
@Test
public void testRetrievalOfFullResults2() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRetrievalOfFullResults3() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.setNonLoopConnection(false);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 0);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRecordResponse() throws IOException, InitializationException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
runner.setNonLoopConnection(false);
runner.run(1);
runner.assertQueueEmpty();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
reader.beginArray();
int controlScore = 0;
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("integer_single")) {
controlScore += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.close();
solrClient.close();
assertEquals(controlScore, 45);
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends QuerySolr {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
return solrClient;
}
}
}

View File

@ -42,6 +42,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
@ -54,6 +55,13 @@ public class TestGetSolr {
static final String DEFAULT_SOLR_CORE = "testCollection";
final static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
final static String DATE_STRING_EARLIER = "1970-01-01T00:00:00.000Z";
final static String DATE_STRING_LATER = "1970-01-01T00:00:00.001Z";
static {
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
}
private SolrClient solrClient;
@Before
@ -62,16 +70,18 @@ public class TestGetSolr {
try {
// create an EmbeddedSolrServer for the processor to use
String relPath = getClass().getProtectionDomain().getCodeSource()
final String relPath = getClass().getProtectionDomain().getCodeSource()
.getLocation().getFile() + "../../target";
solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
DEFAULT_SOLR_CORE, relPath);
final Date date = DATE_FORMAT.parse(DATE_STRING_EARLIER);
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "doc" + i);
doc.addField("created", new Date());
doc.addField("created", date);
doc.addField("string_single", "single" + i + ".1");
doc.addField("string_multi", "multi" + i + ".1");
doc.addField("string_multi", "multi" + i + ".2");
@ -210,23 +220,20 @@ public class TestGetSolr {
}
@Test
public void testInitialDateFilter() throws IOException, SolrServerException {
final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
df.setTimeZone(TimeZone.getTimeZone("GMT"));
final Date dateToFilter = new Date();
public void testInitialDateFilter() throws IOException, SolrServerException, ParseException {
final Date dateToFilter = DATE_FORMAT.parse(DATE_STRING_LATER);
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter));
runner.setProperty(GetSolr.DATE_FILTER, DATE_FORMAT.format(dateToFilter));
runner.setProperty(GetSolr.BATCH_SIZE, "1");
SolrInputDocument doc10 = new SolrInputDocument();
doc10.addField("id", "doc10");
doc10.addField("created", new Date());
doc10.addField("created", dateToFilter);
SolrInputDocument doc11 = new SolrInputDocument();
doc11.addField("id", "doc11");
doc11.addField("created", new Date());
doc11.addField("created", dateToFilter);
solrClient.add(doc10);
solrClient.add(doc11);
@ -292,7 +299,7 @@ public class TestGetSolr {
}
@Test
public void testRecordWriter() throws IOException, SolrServerException, InitializationException {
public void testRecordWriter() throws IOException, InitializationException {
final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient);
TestRunner runner = createDefaultTestRunner(proc);
@ -309,7 +316,7 @@ public class TestGetSolr {
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.setProperty(GetSolr.RECORD_WRITER, "writer");
runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
runner.run(1,true, true);
runner.assertQueueEmpty();

View File

@ -0,0 +1,790 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import com.google.gson.stream.JsonReader;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.junit.Assert;
import org.junit.Test;
import org.xmlunit.matchers.CompareMatcher;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
public class TestQuerySolr {
static final String DEFAULT_SOLR_CORE = "testCollection";
static final String SOLR_CONNECT = "http://localhost:8443/solr";
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
static {
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
}
private SolrClient solrClient;
public SolrClient createSolrClient() {
try {
// create an EmbeddedSolrServer for the processor to use
String relPath = getClass().getProtectionDomain().getCodeSource()
.getLocation().getFile() + "../../target";
solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME,
DEFAULT_SOLR_CORE, relPath);
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "doc" + i);
Date date = new Date();
doc.addField("created", DATE_FORMAT.format(date));
doc.addField("string_single", "single" + i + ".1");
doc.addField("string_multi", "multi" + i + ".1");
doc.addField("string_multi", "multi" + i + ".2");
doc.addField("integer_single", i);
doc.addField("integer_multi", 1);
doc.addField("integer_multi", 2);
doc.addField("integer_multi", 3);
doc.addField("double_single", 0.5 + i);
solrClient.add(doc);
}
solrClient.commit();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
return solrClient;
}
private TestRunner createRunnerWithSolrClient(SolrClient solrClient) {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
return runner;
}
private TestRunner createRunnerWithSolrCloudClient(SolrClient solrClient) {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
return runner;
}
private TestRunner createRunner() {
final TestableProcessor proc = new TestableProcessor(null);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT);
runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE);
return runner;
}
@Test
public void testRepeatingParams() {
TestRunner runner = createRunner();
runner.enqueue(new byte[0]);
runner.setProperty("facet.field.1", "123");
runner.setProperty("facet.field.2", "other_field");
runner.setProperty("f.123.facet.prefix", "pre");
ProcessContext context = runner.getProcessContext();
FlowFile flowFile = runner.getProcessSessionFactory().createSession().get();
Map<String,String[]> solrParams = SolrUtils.getRequestParams(context, flowFile);
String[] facet_fields = solrParams.get("facet.field");
assertEquals(2, facet_fields.length);
assertEquals("123", facet_fields[0]);
assertEquals("other_field", facet_fields[1]);
assertEquals("pre", solrParams.get("f.123.facet.prefix")[0]);
}
@Test
public void testAllFacetCategories() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("facet.field", "integer_multi");
runner.setProperty("facet.interval", "integer_single");
runner.setProperty("facet.interval.set.1", "[4,7]");
runner.setProperty("facet.interval.set.2", "[5,7]");
runner.setProperty("facet.range", "created");
runner.setProperty("facet.range.start", "NOW/MINUTE");
runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE");
runner.setProperty("facet.range.gap", "+20SECOND");
runner.setProperty("facet.query.1", "*:*");
runner.setProperty("facet.query.2", "integer_multi:2");
runner.setProperty("facet.query.3", "integer_multi:3");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.FACETS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("facet_queries")) {
assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader));
} else if (name.equals("facet_fields")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_multi");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30);
reader.endObject();
} else if (name.equals("facet_ranges")) {
reader.beginObject();
assertEquals(reader.nextName(), "created");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10);
reader.endObject();
} else if (name.equals("facet_intervals")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7);
reader.endObject();
}
}
reader.endObject();
reader.close();
solrClient.close();
}
private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException {
int checkSum = 0;
reader.beginArray();
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("count")) {
checkSum += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.endArray();
return checkSum;
}
@Test
public void testFacetTrueButNull() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
// Check for empty nestet Objects in JSON
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("facet_queries")) {
reader.beginArray();
assertFalse(reader.hasNext());
reader.endArray();
} else {
reader.beginObject();
assertFalse(reader.hasNext());
reader.endObject();
}
}
reader.endObject();
JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader_stats.beginObject();
assertEquals(reader_stats.nextName(), "stats_fields");
reader_stats.beginObject();
assertFalse(reader_stats.hasNext());
reader_stats.endObject();
reader_stats.endObject();
reader.close();
reader_stats.close();
solrClient.close();
}
@Test
public void testStats() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("stats", "true");
runner.setProperty("stats.field", "integer_single");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.STATS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader.beginObject();
assertEquals(reader.nextName(), "stats_fields");
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
switch (name) {
case "min": assertEquals(reader.nextString(), "0.0"); break;
case "max": assertEquals(reader.nextString(), "9.0"); break;
case "count": assertEquals(reader.nextInt(), 10); break;
case "sum": assertEquals(reader.nextString(), "45.0"); break;
default: reader.skipValue(); break;
}
}
reader.endObject();
reader.endObject();
reader.endObject();
reader.close();
solrClient.close();
}
@Test
public void testRelationshipRoutings() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
// Set request handler for request failure
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler");
// Processor has no input connection and fails
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Processor has an input connection and fails
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Set request handler for successful request
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select");
// Processor has no input connection and succeeds
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
// Processor has an input connection and succeeds
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, true);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
solrClient.close();
}
@Test
public void testExpressionLanguageForProperties() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}");
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}");
runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}");
runner.enqueue(new byte[0], new HashMap<String,String>(){{
put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)");
put("handler", "/select");
put("fields", "id");
put("sort", "id desc");
put("start", "1");
put("rows", "2");
}});
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testSingleFilterQuery() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq", "id:(doc2 OR doc3)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testMultipleFilterQueries() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)");
runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)");
runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testStandardResponse() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc");
runner.setNonLoopConnection(false);
runner.run();
runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
solrClient.close();
}
@Test
public void testPreserveOriginalContent() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
String content = "test content 123";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0))));
solrClient.close();
}
@Test
public void testRetrievalOfFullResults() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 5);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.STATS, 0);
runner.assertTransferCount(QuerySolr.FACETS, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS);
Integer documentCounter = 0;
Integer startParam = 0;
for (MockFlowFile flowFile : flowFiles) {
Map<String,String> attributes = flowFile.getAttributes();
assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString());
startParam += 2;
StringBuffer expectedXml = new StringBuffer()
.append("<docs><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc></docs>");
assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
}
solrClient.close();
}
@Test
public void testRetrievalOfFullResults2() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRetrievalOfFullResults3() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.setNonLoopConnection(false);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 0);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRecordResponse() throws IOException, InitializationException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
runner.setNonLoopConnection(false);
runner.run(1);
runner.assertQueueEmpty();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
reader.beginArray();
int controlScore = 0;
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("integer_single")) {
controlScore += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.close();
solrClient.close();
assertEquals(controlScore, 45);
}
@Test
public void testExceedStartParam() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_START, "10001");
runner.setNonLoopConnection(false);
runner.run();
runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
assertEquals("10001", flowFile.getAttribute(QuerySolr.ATTRIBUTE_SOLR_START));
assertEquals(0, runner.getContentAsByteArray(flowFile).length);
solrClient.close();
}
@Test
public void testAttributesFailure() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrCloudClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
}
@Test
public void testAttributes() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrCloudClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue("");
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
Map<String, String> attributes = flowFile.getAttributes();
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
assertEquals(QuerySolr.MIME_TYPE_XML, attributes.get(CoreAttributes.MIME_TYPE.key()));
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
attributes = flowFile.getAttributes();
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
assertEquals(QuerySolr.MIME_TYPE_JSON, attributes.get(CoreAttributes.MIME_TYPE.key()));
flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
attributes = flowFile.getAttributes();
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS));
assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS));
assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS));
assertEquals(QuerySolr.MIME_TYPE_JSON, attributes.get(CoreAttributes.MIME_TYPE.key()));
flowFile = runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0);
attributes = flowFile.getAttributes();
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY);
assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT));
assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION));
solrClient.close();
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends QuerySolr {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
return solrClient;
}
}
}