From aa196bc01f438a3e2da04419f619c7baf3fc001d Mon Sep 17 00:00:00 2001 From: JohannesDaniel Date: Thu, 12 Apr 2018 15:00:47 +0200 Subject: [PATCH] NIFI-4516 Added QuerySolr after rebase This closes #2517 Signed-off-by: Mike Thomsen --- .../nifi-solr-processors/pom.xml | 16 +- .../apache/nifi/processors/solr/GetSolr.java | 14 +- .../processors/solr/PutSolrContentStream.java | 39 +- .../nifi/processors/solr/QuerySolr.java | 615 ++++++++++++++ .../nifi/processors/solr/SolrUtils.java | 49 +- .../org.apache.nifi.processor.Processor | 1 + .../solr/QuerySolr/additionalDetails.html | 142 ++++ .../nifi/processors/solr/QuerySolrIT.java | 640 ++++++++++++++ .../nifi/processors/solr/TestGetSolr.java | 31 +- .../nifi/processors/solr/TestQuerySolr.java | 790 ++++++++++++++++++ 10 files changed, 2265 insertions(+), 72 deletions(-) create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml index 943e8f61de..5684f3767c 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -63,6 +63,7 @@ org.apache.nifi nifi-api + provided org.apache.nifi @@ -74,6 +75,11 @@ nifi-ssl-context-service-api provided + + com.google.code.gson + gson + 2.7 + org.apache.nifi @@ -103,19 +109,19 @@ junit test + org.apache.solr solr-core ${solr.version} - test com.fasterxml.jackson.core jackson-core + test - org.apache.lucene lucene-core @@ -134,12 +140,6 @@ ${solr.version} test - - com.google.code.gson - gson - 2.7 - test - org.xmlunit xmlunit-matchers diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index 6260304f2a..679b02f9b2 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -46,7 +46,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -85,6 +84,7 @@ import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; @Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -106,15 +106,6 @@ public class GetSolr extends SolrProcessor { .defaultValue(MODE_XML.getValue()) .build(); - public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor - .Builder().name("Record Writer") - .displayName("Record Writer") - .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.") - .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(false) - .build(); - public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor .Builder().name("Solr Query") .displayName("Solr Query") @@ -376,7 +367,8 @@ public class GetSolr extends SolrProcessor { flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); } else { - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions() + .asControllerService(RecordSetWriterFactory.class); final RecordSchema schema = writerFactory.getSchema(null, null); final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); final StringBuffer mimeType = new StringBuffer(); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 71f186bfc9..718f5e97f1 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -47,14 +47,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -124,7 +120,6 @@ public class PutSolrContentStream extends SolrProcessor { public static final String COLLECTION_PARAM_NAME = "collection"; public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; - public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+"; private Set relationships; private List descriptors; @@ -194,7 +189,7 @@ public class PutSolrContentStream extends SolrProcessor { final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue(); final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH).evaluateAttributeExpressions(flowFile).getValue(); - final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); + final MultiMapSolrParams requestParams = new MultiMapSolrParams(SolrUtils.getRequestParams(context, flowFile)); StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @@ -292,36 +287,4 @@ public class PutSolrContentStream extends SolrProcessor { } return foundIOException; } - - // get all of the dynamic properties and values into a Map for later adding to the Solr request - private Map getRequestParams(ProcessContext context, FlowFile flowFile) { - final Map paramsMap = new HashMap<>(); - final SortedMap repeatingParams = new TreeMap<>(); - - for (final Map.Entry entry : context.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.isDynamic()) { - final String paramName = descriptor.getName(); - final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); - - if (!paramValue.trim().isEmpty()) { - if (paramName.matches(REPEATING_PARAM_PATTERN)) { - repeatingParams.put(paramName, paramValue); - } else { - MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); - } - } - } - } - - for (final Map.Entry entry : repeatingParams.entrySet()) { - final String paramName = entry.getKey(); - final String paramValue = entry.getValue(); - final int idx = paramName.lastIndexOf("."); - MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); - } - - return paramsMap; - } - } diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java new file mode 100755 index 0000000000..06039d7c7f --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/QuerySolr.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@WritesAttributes({ + @WritesAttribute(attribute = "solr.connect", description = "Solr connect string"), + @WritesAttribute(attribute = "solr.collection", description = "Solr collection"), + @WritesAttribute(attribute = "solr.query", description = "Query string sent to Solr"), + @WritesAttribute(attribute = "solr.cursor.mark", description = "Cursor mark can be used for scrolling Solr"), + @WritesAttribute(attribute = "solr.status.code", description = "Status code of Solr request. A status code of 0 indicates that the request was successfully processed"), + @WritesAttribute(attribute = "solr.query.time", description = "The elapsed time to process the query (in ms)"), + @WritesAttribute(attribute = "solr.start", description = "Solr start parameter (result offset) for the query"), + @WritesAttribute(attribute = "solr.rows", description = "Number of Solr documents to be returned for the query"), + @WritesAttribute(attribute = "solr.number.results", description = "Number of Solr documents that match the query"), + @WritesAttribute(attribute = "mime.type", description = "The mime type of the data format"), + @WritesAttribute(attribute = "querysolr.exeption.class", description = "The Java exception class raised when the processor fails"), + @WritesAttribute(attribute = "querysolr.exeption.message", description = "The Java exception message raised when the processor fails") +}) +public class QuerySolr extends SolrProcessor { + + public static final AllowableValue MODE_XML = new AllowableValue("XML"); + public static final AllowableValue MODE_REC = new AllowableValue("Records"); + + public static final AllowableValue RETURN_TOP_RESULTS = new AllowableValue("return_only_top_results", "Only top results"); + public static final AllowableValue RETURN_ALL_RESULTS = new AllowableValue("return_all_results", "Entire results"); + + public static final String MIME_TYPE_JSON = "application/json"; + public static final String MIME_TYPE_XML = "application/xml"; + public static final String ATTRIBUTE_SOLR_CONNECT = "solr.connect"; + public static final String ATTRIBUTE_SOLR_COLLECTION = "solr.collection"; + public static final String ATTRIBUTE_SOLR_QUERY = "solr.query"; + public static final String ATTRIBUTE_CURSOR_MARK = "solr.cursor.mark"; + public static final String ATTRIBUTE_SOLR_STATUS = "solr.status.code"; + public static final String ATTRIBUTE_SOLR_START = "solr.start"; + public static final String ATTRIBUTE_SOLR_ROWS = "solr.rows"; + public static final String ATTRIBUTE_SOLR_NUMBER_RESULTS = "solr.number.results"; + public static final String ATTRIBUTE_QUERY_TIME = "solr.query.time"; + public static final String EXCEPTION = "querysolr.exeption"; + public static final String EXCEPTION_MESSAGE = "querysolr.exeption.message"; + + public static final Integer UPPER_LIMIT_START_PARAM = 10000; + + public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor + .Builder().name("return_type") + .displayName("Return Type") + .description("Output format of Solr results. Write Solr documents to FlowFiles as XML or using a Record Writer") + .required(true) + .allowableValues(MODE_XML, MODE_REC) + .defaultValue(MODE_XML.getValue()) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_QUERY = new PropertyDescriptor + .Builder().name("solr_param_query") + .displayName("Solr Query") + .description("Solr Query, e. g. field:value") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("*:*") + .build(); + + public static final PropertyDescriptor SOLR_PARAM_REQUEST_HANDLER = new PropertyDescriptor + .Builder().name("solr_param_request_handler") + .displayName("Request Handler") + .description("Define a request handler here, e. g. /query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("/select") + .build(); + + public static final PropertyDescriptor SOLR_PARAM_FIELD_LIST = new PropertyDescriptor + .Builder().name("solr_param_field_list") + .displayName("Field List") + .description("Comma separated list of fields to be included into results, e. g. field1,field2") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_SORT = new PropertyDescriptor + .Builder().name("solr_param_sort") + .displayName("Sorting of result list") + .description("Comma separated sort clauses to define the sorting of results, e. g. field1 asc, field2 desc") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_START = new PropertyDescriptor + .Builder().name("solr_param_start") + .displayName("Start of results") + .description("Offset of result set") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SOLR_PARAM_ROWS = new PropertyDescriptor + .Builder().name("solr_param_rows") + .displayName("Rows") + .description("Number of results to be returned for a single request") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor AMOUNT_DOCUMENTS_TO_RETURN = new PropertyDescriptor + .Builder().name("amount_documents_to_return") + .displayName("Total amount of returned results") + .description("Total amount of Solr documents to be returned. If this property is set to \"Only top results\", " + + "only single requests will be sent to Solr and the results will be written into single FlowFiles. If it is set to " + + "\"Entire results\", all results matching to the query are retrieved via multiple Solr requests and " + + "returned in multiple FlowFiles. For both options, the number of Solr documents to be returned in a FlowFile depends on " + + "the configuration of the \"Rows\" property") + .required(true) + .allowableValues(RETURN_ALL_RESULTS, RETURN_TOP_RESULTS) + .defaultValue(RETURN_TOP_RESULTS.getValue()) + .build(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value to send for the '" + propertyDescriptorName + "' Solr parameter") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + public static final Relationship RESULTS = new Relationship.Builder().name("results") + .description("Results of Solr queries").build(); + public static final Relationship FACETS = new Relationship.Builder().name("facets") + .description("Results of faceted search").build(); + public static final Relationship STATS = new Relationship.Builder().name("stats") + .description("Stats about Solr index").build(); + public static final Relationship ORIGINAL = new Relationship.Builder().name("original") + .description("Original flowfile").build(); + public static final Relationship FAILURE = new Relationship.Builder().name("failure") + .description("Failure relationship").build(); + + private Set relationships; + private List descriptors; + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(COLLECTION); + descriptors.add(RETURN_TYPE); + descriptors.add(RECORD_WRITER); + descriptors.add(SOLR_PARAM_QUERY); + descriptors.add(SOLR_PARAM_REQUEST_HANDLER); + descriptors.add(SOLR_PARAM_FIELD_LIST); + descriptors.add(SOLR_PARAM_SORT); + descriptors.add(SOLR_PARAM_START); + descriptors.add(SOLR_PARAM_ROWS); + descriptors.add(AMOUNT_DOCUMENTS_TO_RETURN); + descriptors.add(JAAS_CLIENT_APP_NAME); + descriptors.add(BASIC_USERNAME); + descriptors.add(BASIC_PASSWORD); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(SOLR_SOCKET_TIMEOUT); + descriptors.add(SOLR_CONNECTION_TIMEOUT); + descriptors.add(SOLR_MAX_CONNECTIONS); + descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST); + descriptors.add(ZK_CLIENT_TIMEOUT); + descriptors.add(ZK_CONNECTION_TIMEOUT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(FAILURE); + relationships.add(RESULTS); + relationships.add(FACETS); + relationships.add(STATS); + relationships.add(ORIGINAL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + public static final Set SUPPORTED_SEARCH_COMPONENTS = new HashSet<>(); + static { + SUPPORTED_SEARCH_COMPONENTS.addAll(Arrays.asList(StatsParams.STATS, FacetParams.FACET)); + } + + public static final Set SEARCH_COMPONENTS_ON = new HashSet<>(); + static { + SEARCH_COMPONENTS_ON.addAll(Arrays.asList("true", "on", "yes")); + } + + @Override + protected final Collection additionalCustomValidation(ValidationContext context) { + final Collection problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for writing records a record writer has to be configured") + .valid(false) + .subject("Record writer check") + .build()); + } + return problems; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + + FlowFile flowFileOriginal = session.get(); + FlowFile flowFileResponse; + + if (flowFileOriginal == null) { + if (context.hasNonLoopConnection()) { + return; + } + flowFileResponse = session.create(); + } else { + flowFileResponse = session.create(flowFileOriginal); + } + + final SolrQuery solrQuery = new SolrQuery(); + final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFileResponse).getValue(); + + final StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(getSolrLocation()); + if (isSolrCloud) { + transitUri.append(":").append(collection); + } + final StopWatch timer = new StopWatch(false); + + try { + solrQuery.setQuery(context.getProperty(SOLR_PARAM_QUERY).evaluateAttributeExpressions(flowFileResponse).getValue()); + solrQuery.setRequestHandler(context.getProperty(SOLR_PARAM_REQUEST_HANDLER).evaluateAttributeExpressions(flowFileResponse).getValue()); + + if (context.getProperty(SOLR_PARAM_FIELD_LIST).isSet()) { + for (final String field : context.getProperty(SOLR_PARAM_FIELD_LIST).evaluateAttributeExpressions(flowFileResponse).getValue() + .split(",")) { + solrQuery.addField(field.trim()); + } + } + + // Avoid ArrayIndexOutOfBoundsException due to incorrectly configured sorting + try { + if (context.getProperty(SOLR_PARAM_SORT).isSet()) { + final List sortings = new ArrayList<>(); + for (final String sorting : context.getProperty(SOLR_PARAM_SORT).evaluateAttributeExpressions(flowFileResponse).getValue() + .split(",")) { + final String[] sortEntry = sorting.trim().split(" "); + sortings.add(new SolrQuery.SortClause(sortEntry[0], sortEntry[1])); + } + solrQuery.setSorts(sortings); + } + } catch (Exception e) { + throw new ProcessException("Error while parsing the sort clauses for the Solr query"); + } + + final Integer startParam = context.getProperty(SOLR_PARAM_START).isSet() ? Integer.parseInt( + context.getProperty(SOLR_PARAM_START).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.START_DEFAULT; + + solrQuery.setStart(startParam); + + final Integer rowParam = context.getProperty(SOLR_PARAM_ROWS).isSet() ? Integer.parseInt( + context.getProperty(SOLR_PARAM_ROWS).evaluateAttributeExpressions(flowFileResponse).getValue()) : CommonParams.ROWS_DEFAULT; + + solrQuery.setRows(rowParam); + + final Map additionalSolrParams = SolrUtils.getRequestParams(context, flowFileResponse); + + final Set searchComponents = extractSearchComponents(additionalSolrParams); + solrQuery.add(new MultiMapSolrParams(additionalSolrParams)); + + final Map attributes = new HashMap<>(); + attributes.put(ATTRIBUTE_SOLR_CONNECT, getSolrLocation()); + if (isSolrCloud) { + attributes.put(ATTRIBUTE_SOLR_COLLECTION, collection); + } + attributes.put(ATTRIBUTE_SOLR_QUERY, solrQuery.toString()); + if (flowFileOriginal != null) { + flowFileOriginal = session.putAllAttributes(flowFileOriginal, attributes); + } + + flowFileResponse = session.putAllAttributes(flowFileResponse, attributes); + + final boolean getEntireResults = RETURN_ALL_RESULTS.equals(context.getProperty(AMOUNT_DOCUMENTS_TO_RETURN).getValue()); + boolean processFacetsAndStats = true; + boolean continuePaging = true; + + while (continuePaging){ + + timer.start(); + + Map responseAttributes = new HashMap<>(); + responseAttributes.put(ATTRIBUTE_SOLR_START, solrQuery.getStart().toString()); + responseAttributes.put(ATTRIBUTE_SOLR_ROWS, solrQuery.getRows().toString()); + + if (solrQuery.getStart() > UPPER_LIMIT_START_PARAM) { + logger.warn("The start parameter of Solr query {} exceeded the upper limit of {}. The query will not be processed " + + "to avoid performance or memory issues on the part of Solr.", new Object[]{solrQuery.toString(), UPPER_LIMIT_START_PARAM}); + flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes); + timer.stop(); + break; + } + + final QueryRequest req = new QueryRequest(solrQuery); + if (isBasicAuthEnabled()) { + req.setBasicAuthCredentials(getUsername(), getPassword()); + } + + final QueryResponse response = req.process(getSolrClient()); + timer.stop(); + + final Long totalNumberOfResults = response.getResults().getNumFound(); + + responseAttributes.put(ATTRIBUTE_SOLR_NUMBER_RESULTS, totalNumberOfResults.toString()); + responseAttributes.put(ATTRIBUTE_CURSOR_MARK, response.getNextCursorMark()); + responseAttributes.put(ATTRIBUTE_SOLR_STATUS, String.valueOf(response.getStatus())); + responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime())); + flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes); + + if (response.getResults().size() > 0) { + + if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){ + flowFileResponse = session.write(flowFileResponse, SolrUtils.getOutputStreamCallbackToTransformSolrResponseToXml(response)); + flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_XML); + } else { + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse) + .asControllerService(RecordSetWriterFactory.class); + final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null); + final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); + final StringBuffer mimeType = new StringBuffer(); + flowFileResponse = session.write(flowFileResponse, out -> { + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { + writer.write(recordSet); + writer.flush(); + mimeType.append(writer.getMimeType()); + } catch (SchemaNotFoundException e) { + throw new ProcessException("Could not parse Solr response", e); + } + }); + flowFileResponse = session.putAttribute(flowFileResponse, CoreAttributes.MIME_TYPE.key(), mimeType.toString()); + } + + if (processFacetsAndStats) { + if (searchComponents.contains(FacetParams.FACET)) { + FlowFile flowFileFacets = session.create(flowFileResponse); + flowFileFacets = session.write(flowFileFacets, out -> { + try ( + final OutputStreamWriter osw = new OutputStreamWriter(out); + final JsonWriter writer = new JsonWriter(osw) + ) { + addFacetsFromSolrResponseToJsonWriter(response, writer); + } + }); + flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); + session.getProvenanceReporter().receive(flowFileFacets, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileFacets, FACETS); + } + + if (searchComponents.contains(StatsParams.STATS)) { + FlowFile flowFileStats = session.create(flowFileResponse); + flowFileStats = session.write(flowFileStats, out -> { + try ( + final OutputStreamWriter osw = new OutputStreamWriter(out); + final JsonWriter writer = new JsonWriter(osw) + ) { + addStatsFromSolrResponseToJsonWriter(response, writer); + } + }); + flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); + session.getProvenanceReporter().receive(flowFileStats, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileStats, STATS); + } + processFacetsAndStats = false; + } + } + + if (getEntireResults) { + final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows(); + if (totalDocumentsReturned < totalNumberOfResults) { + solrQuery.setStart(totalDocumentsReturned); + session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileResponse, RESULTS); + flowFileResponse = session.create(flowFileResponse); + } else { + continuePaging = false; + } + } else { + continuePaging = false; + } + } + + } catch (Exception e) { + flowFileResponse = session.penalize(flowFileResponse); + flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION, e.getClass().getName()); + flowFileResponse = session.putAttribute(flowFileResponse, EXCEPTION_MESSAGE, e.getMessage()); + session.transfer(flowFileResponse, FAILURE); + logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e); + if (flowFileOriginal != null) { + flowFileOriginal = session.penalize(flowFileOriginal); + } + } + + if (!flowFileResponse.isPenalized()) { + session.getProvenanceReporter().receive(flowFileResponse, transitUri.toString(), timer.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFileResponse, RESULTS); + } + + if (flowFileOriginal != null) { + if (!flowFileOriginal.isPenalized()) { + session.transfer(flowFileOriginal, ORIGINAL); + } else { + session.remove(flowFileOriginal); + } + } + } + + private Set extractSearchComponents(Map solrParams) { + final Set searchComponentsTemp = new HashSet<>(); + for (final String searchComponent : SUPPORTED_SEARCH_COMPONENTS) + if (solrParams.keySet().contains(searchComponent)) { + if (SEARCH_COMPONENTS_ON.contains(solrParams.get(searchComponent)[0])) { + searchComponentsTemp.add(searchComponent); + } + } + return Collections.unmodifiableSet(searchComponentsTemp); + } + + private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException { + writer.beginObject(); + writer.name("stats_fields"); + writer.beginObject(); + for (Map.Entry entry: response.getFieldStatsInfo().entrySet()) { + FieldStatsInfo fsi = entry.getValue(); + writer.name(entry.getKey()); + writer.beginObject(); + writer.name("min").value(fsi.getMin().toString()); + writer.name("max").value(fsi.getMax().toString()); + writer.name("count").value(fsi.getCount()); + writer.name("missing").value(fsi.getMissing()); + writer.name("sum").value(fsi.getSum().toString()); + writer.name("mean").value(fsi.getMean().toString()); + writer.name("sumOfSquares").value(fsi.getSumOfSquares()); + writer.name("stddev").value(fsi.getStddev()); + writer.endObject(); + } + writer.endObject(); + writer.endObject(); + } + + private static void addFacetsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException { + writer.beginObject(); + writer.name("facet_queries"); + writer.beginArray(); + for (final Map.Entry facetQuery : response.getFacetQuery().entrySet()){ + writer.beginObject(); + writer.name("facet").value(facetQuery.getKey()); + writer.name("count").value(facetQuery.getValue()); + writer.endObject(); + } + writer.endArray(); + + writer.name("facet_fields"); + writer.beginObject(); + for (final FacetField facetField : response.getFacetFields()){ + writer.name(facetField.getName()); + writer.beginArray(); + for (final FacetField.Count count : facetField.getValues()) { + writer.beginObject(); + writer.name("facet").value(count.getName()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + + writer.name("facet_ranges"); + writer.beginObject(); + for (final RangeFacet rangeFacet : response.getFacetRanges()) { + writer.name(rangeFacet.getName()); + writer.beginArray(); + final List list = rangeFacet.getCounts(); + for (final Count count : list) { + writer.beginObject(); + writer.name("facet").value(count.getValue()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + + writer.name("facet_intervals"); + writer.beginObject(); + for (final IntervalFacet intervalFacet : response.getIntervalFacets()) { + writer.name(intervalFacet.getField()); + writer.beginArray(); + for (final IntervalFacet.Count count : intervalFacet.getIntervals()) { + writer.beginObject(); + writer.name("facet").value(count.getKey()); + writer.name("count").value(count.getCount()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); + writer.endObject(); + } +} + + diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java index 6a7e438b74..ae83b1c0e0 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java @@ -28,8 +28,11 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.ListRecordSet; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; @@ -48,15 +51,19 @@ import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; public class SolrUtils { @@ -67,6 +74,15 @@ public class SolrUtils { public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( "Standard", "Standard", "A stand-alone Solr instance."); + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor + .Builder().name("Record Writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor .Builder().name("Solr Type") .description("The type of Solr instance, Cloud or Standard.") @@ -176,6 +192,8 @@ public class SolrUtils { .defaultValue("10 seconds") .build(); + public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$"; + public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) { final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); @@ -220,8 +238,6 @@ public class SolrUtils { } } - - /** * Writes each SolrDocument to a record. */ @@ -245,7 +261,6 @@ public class SolrUtils { return new ListRecordSet(schema, lr); } - public static OutputStreamCallback getOutputStreamCallbackToTransformSolrResponseToXml(QueryResponse response) { return new QueryResponseOutputStreamCallback(response); } @@ -281,5 +296,33 @@ public class SolrUtils { } } + public static Map getRequestParams(ProcessContext context, FlowFile flowFile) { + final Map paramsMap = new HashMap<>(); + final SortedMap repeatingParams = new TreeMap<>(); + for (final Map.Entry entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + final String paramName = descriptor.getName(); + final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); + + if (!paramValue.trim().isEmpty()) { + if (paramName.matches(REPEATING_PARAM_PATTERN)) { + repeatingParams.put(paramName, paramValue); + } else { + MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + } + } + } + } + + for (final Map.Entry entry : repeatingParams.entrySet()) { + final String paramName = entry.getKey(); + final String paramValue = entry.getValue(); + final int idx = paramName.lastIndexOf("."); + MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); + } + + return paramsMap; + } } diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 657d0e8433..cc05423cd7 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.solr.PutSolrContentStream org.apache.nifi.processors.solr.GetSolr +org.apache.nifi.processors.solr.QuerySolr diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html new file mode 100755 index 0000000000..d8b96e3341 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org/apache/nifi/processors/solr/QuerySolr/additionalDetails.html @@ -0,0 +1,142 @@ + + + + + + QuerySolr + + + + +

Usage Example

+ +

+ This processor queries Solr and writes results to FlowFiles. The processor can be used at the + beginning of dataflows and later. Solr results can be written to FlowFiles as Solr XML or using + records functions (supporting CSV, JSON, etc.). Additionally, facets and stats can be retrieved. + They are written to FlowFiles in JSON and sent to designated relationships. +

+

+ The processor can either be configured to retrieve only top results or full result sets. However, + it should be emphasized that this processor is not designed to export large result sets from Solr. + If the processor is configured to return full result sets, the configured number of rows per + request will be used as batch size and the processor will iteratively increase the start parameter + returning results in one FlowFile per request. The processor will stop iterating through results as + soon as the start parameter exceeds 10000. For exporting large result sets, it can be considered + to make use of the processor GetSolr. Principally, it is also possible to embed this processor into a + dataflow iterating through results making use of the attribute solr.cursor.mark that is added to FlowFiles + for each request. Notice that the usage of Solr's cursor mark requires queries to fulfil several preconditions + (see Solr documentation for deep paging for additional details). +

+ +

+ The most common Solr parameters can be defined via processor properties. Other parameters have to be set via + dynamic properties. +

+ +

+ Parameters that can be set multiple times also have to be defined as dynamic properties + (e. g. fq, facet.field, stats.field). If these parameters must be set multiple times with different values, + properties can follow a naming convention: + name.number, where name is the parameter name and number is a unique number. + Repeating parameters will be sorted by their property name. +

+ +

+ Example: Defining the fq parameter multiple times +

+ + + + + + + + + + + + + + + + + + +
Property NameProperty Value
fq.1field1:value1
fq.2field2:value2
fq.3field3:value3
+ +

+ This definition will be appended to the Solr URL as follows: + fq=field1:value1&fq=field2:value2&fq=field3:value3 +

+ +

+ Facets and stats can be activated setting the respective Solr parameters as dynamic properties. Example: +

+ + + + + + + + + + + + + + + + + + + + + + +
Property NameProperty Value
facettrue
facet.fieldfieldname
statstrue
stats.fieldfieldname
+ +

+ Multiple fields for facets or stats can be defined in the same way as it is described for multiple filter queries: +

+ + + + + + + + + + + + + + + + + + +
Property NameProperty Value
facettrue
facet.field.1firstField
facet.field.2secondField
+ +

+ This definition will be appended to the Solr URL as follows: + facet=true&facet.field=firstField&facet.field=secondField +

+ + + diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java new file mode 100755 index 0000000000..cede9a57f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java @@ -0,0 +1,640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.SolrInputDocument; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class QuerySolrIT { + /* + + This integration test expects a Solr instance running locally in SolrCloud mode, coordinated by a single ZooKeeper + instance accessible with the ZooKeeper-Connect-String "localhost:2181". + + */ + + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); + private static final SimpleDateFormat DATE_FORMAT_SOLR_COLLECTION = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.US); + private static String SOLR_COLLECTION; + private static String ZK_CONFIG_PATH; + private static String ZK_CONFIG_NAME; + private static String SOLR_LOCATION = "localhost:2181"; + + static { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + Date date = new Date(); + SOLR_COLLECTION = DATE_FORMAT_SOLR_COLLECTION.format(date) + "_QuerySolrIT"; + ZK_CONFIG_PATH = "src/test/resources/solr/testCollection/conf"; + ZK_CONFIG_NAME = "QuerySolrIT_config"; + } + + @BeforeClass + public static void setup() throws IOException, SolrServerException { + CloudSolrClient solrClient = createSolrClient(); + Path currentDir = Paths.get(ZK_CONFIG_PATH); + solrClient.uploadConfig(currentDir, ZK_CONFIG_NAME); + solrClient.setDefaultCollection(SOLR_COLLECTION); + + if (!solrClient.getZkStateReader().getClusterState().hasCollection(SOLR_COLLECTION)) { + CollectionAdminRequest.Create createCollection = CollectionAdminRequest.createCollection(SOLR_COLLECTION, ZK_CONFIG_NAME, 1, 1); + createCollection.process(solrClient); + } else { + solrClient.deleteByQuery("*:*"); + } + + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "doc" + i); + Date date = new Date(); + doc.addField("created", DATE_FORMAT.format(date)); + doc.addField("string_single", "single" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".2"); + doc.addField("integer_single", i); + doc.addField("integer_multi", 1); + doc.addField("integer_multi", 2); + doc.addField("integer_multi", 3); + doc.addField("double_single", 0.5 + i); + + solrClient.add(doc); + } + solrClient.commit(); + } + + public static CloudSolrClient createSolrClient() { + CloudSolrClient solrClient = null; + + try { + solrClient = new CloudSolrClient.Builder().withZkHost(SOLR_LOCATION).build(); + solrClient.setDefaultCollection(SOLR_COLLECTION); + } catch (Exception e) { + e.printStackTrace(); + } + return solrClient; + } + + @AfterClass + public static void teardown() { + try { + CloudSolrClient solrClient = createSolrClient(); + CollectionAdminRequest.Delete deleteCollection = CollectionAdminRequest.deleteCollection(SOLR_COLLECTION); + deleteCollection.process(solrClient); + solrClient.close(); + } catch (Exception e) { + } + } + + private TestRunner createRunnerWithSolrClient(SolrClient solrClient) { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:2181"); + runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION); + + return runner; + } + + @Test + public void testAllFacetCategories() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("facet.field", "integer_multi"); + runner.setProperty("facet.interval", "integer_single"); + runner.setProperty("facet.interval.set.1", "[4,7]"); + runner.setProperty("facet.interval.set.2", "[5,7]"); + runner.setProperty("facet.range", "created"); + runner.setProperty("facet.range.start", "NOW/MINUTE"); + runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE"); + runner.setProperty("facet.range.gap", "+20SECOND"); + runner.setProperty("facet.query.1", "*:*"); + runner.setProperty("facet.query.2", "integer_multi:2"); + runner.setProperty("facet.query.3", "integer_multi:3"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + runner.assertTransferCount(QuerySolr.FACETS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + if (name.equals("facet_queries")) { + assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader)); + } else if (name.equals("facet_fields")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_multi"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30); + reader.endObject(); + } else if (name.equals("facet_ranges")) { + reader.beginObject(); + assertEquals(reader.nextName(), "created"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10); + reader.endObject(); + } else if (name.equals("facet_intervals")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7); + reader.endObject(); + } + } + reader.endObject(); + reader.close(); + solrClient.close(); + } + + private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException { + int checkSum = 0; + reader.beginArray(); + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("count")) { + checkSum += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.endArray(); + return checkSum; + } + + @Test + public void testFacetTrueButNull() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + // Check for empty nestet Objects in JSON + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("facet_queries")) { + reader.beginArray(); + assertFalse(reader.hasNext()); + reader.endArray(); + } else { + reader.beginObject(); + assertFalse(reader.hasNext()); + reader.endObject(); + } + } + reader.endObject(); + + JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader_stats.beginObject(); + assertEquals(reader_stats.nextName(), "stats_fields"); + reader_stats.beginObject(); + assertFalse(reader_stats.hasNext()); + reader_stats.endObject(); + reader_stats.endObject(); + + reader.close(); + reader_stats.close(); + solrClient.close(); + } + + @Test + public void testStats() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("stats", "true"); + runner.setProperty("stats.field", "integer_single"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.STATS, 1); + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader.beginObject(); + assertEquals(reader.nextName(), "stats_fields"); + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + switch (name) { + case "min": assertEquals(reader.nextString(), "0.0"); break; + case "max": assertEquals(reader.nextString(), "9.0"); break; + case "count": assertEquals(reader.nextInt(), 10); break; + case "sum": assertEquals(reader.nextString(), "45.0"); break; + default: reader.skipValue(); break; + } + } + reader.endObject(); + reader.endObject(); + reader.endObject(); + + reader.close(); + solrClient.close(); + } + + @Test + public void testRelationshipRoutings() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + // Set request handler for request failure + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler"); + + // Processor has no input connection and fails + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Processor has an input connection and fails + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Set request handler for successful request + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select"); + + // Processor has no input connection and succeeds + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + // Processor has an input connection and succeeds + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, true); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + solrClient.close(); + } + + @Test + public void testExpressionLanguageForProperties() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}"); + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}"); + runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}"); + + runner.enqueue(new byte[0], new HashMap(){{ + put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + put("handler", "/select"); + put("fields", "id"); + put("sort", "id desc"); + put("start", "1"); + put("rows", "2"); + }}); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc1"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testSingleFilterQuery() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq", "id:(doc2 OR doc3)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc3"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + + @Test + public void testMultipleFilterQueries() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)"); + runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc3"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testStandardResponse() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc"); + + runner.setNonLoopConnection(false); + runner.run(); + runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + + String expectedXml = "doc1doc0"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + + solrClient.close(); + } + + @Test + public void testPreserveOriginalContent() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + String content = "test content 123"; + + runner.enqueue(content); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + + String expectedXml = "doc0"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0)))); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 5); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.STATS, 0); + runner.assertTransferCount(QuerySolr.FACETS, 0); + + List flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS); + Integer documentCounter = 0; + Integer startParam = 0; + + for (MockFlowFile flowFile : flowFiles) { + Map attributes = flowFile.getAttributes(); + assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString()); + startParam += 2; + + StringBuffer expectedXml = new StringBuffer() + .append("doc") + .append(documentCounter++) + .append("doc") + .append(documentCounter++) + .append(""); + assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + } + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults2() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults3() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.setNonLoopConnection(false); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 0); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + + @Test + public void testRecordResponse() throws IOException, InitializationException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10"); + + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(SolrUtils.RECORD_WRITER, "writer"); + + runner.setNonLoopConnection(false); + + runner.run(1); + runner.assertQueueEmpty(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + reader.beginArray(); + int controlScore = 0; + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("integer_single")) { + controlScore += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.close(); + solrClient.close(); + + assertEquals(controlScore, 45); + } + + // Override createSolrClient and return the passed in SolrClient + private class TestableProcessor extends QuerySolr { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrClient(ProcessContext context, String solrLocation) { + return solrClient; + } + } +} diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java index af5f3dd41e..55d6d25a0d 100755 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; @@ -54,6 +55,13 @@ public class TestGetSolr { static final String DEFAULT_SOLR_CORE = "testCollection"; + final static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); + final static String DATE_STRING_EARLIER = "1970-01-01T00:00:00.000Z"; + final static String DATE_STRING_LATER = "1970-01-01T00:00:00.001Z"; + static { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + } + private SolrClient solrClient; @Before @@ -62,16 +70,18 @@ public class TestGetSolr { try { // create an EmbeddedSolrServer for the processor to use - String relPath = getClass().getProtectionDomain().getCodeSource() + final String relPath = getClass().getProtectionDomain().getCodeSource() .getLocation().getFile() + "../../target"; solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, DEFAULT_SOLR_CORE, relPath); + final Date date = DATE_FORMAT.parse(DATE_STRING_EARLIER); + for (int i = 0; i < 10; i++) { SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", "doc" + i); - doc.addField("created", new Date()); + doc.addField("created", date); doc.addField("string_single", "single" + i + ".1"); doc.addField("string_multi", "multi" + i + ".1"); doc.addField("string_multi", "multi" + i + ".2"); @@ -210,23 +220,20 @@ public class TestGetSolr { } @Test - public void testInitialDateFilter() throws IOException, SolrServerException { - final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); - df.setTimeZone(TimeZone.getTimeZone("GMT")); - final Date dateToFilter = new Date(); - + public void testInitialDateFilter() throws IOException, SolrServerException, ParseException { + final Date dateToFilter = DATE_FORMAT.parse(DATE_STRING_LATER); final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); TestRunner runner = createDefaultTestRunner(proc); - runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter)); + runner.setProperty(GetSolr.DATE_FILTER, DATE_FORMAT.format(dateToFilter)); runner.setProperty(GetSolr.BATCH_SIZE, "1"); SolrInputDocument doc10 = new SolrInputDocument(); doc10.addField("id", "doc10"); - doc10.addField("created", new Date()); + doc10.addField("created", dateToFilter); SolrInputDocument doc11 = new SolrInputDocument(); doc11.addField("id", "doc11"); - doc11.addField("created", new Date()); + doc11.addField("created", dateToFilter); solrClient.add(doc10); solrClient.add(doc11); @@ -292,7 +299,7 @@ public class TestGetSolr { } @Test - public void testRecordWriter() throws IOException, SolrServerException, InitializationException { + public void testRecordWriter() throws IOException, InitializationException { final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); TestRunner runner = createDefaultTestRunner(proc); @@ -309,7 +316,7 @@ public class TestGetSolr { runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); runner.enableControllerService(jsonWriter); - runner.setProperty(GetSolr.RECORD_WRITER, "writer"); + runner.setProperty(SolrUtils.RECORD_WRITER, "writer"); runner.run(1,true, true); runner.assertQueueEmpty(); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java new file mode 100755 index 0000000000..0f5fb7e1f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.Assert; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestQuerySolr { + static final String DEFAULT_SOLR_CORE = "testCollection"; + static final String SOLR_CONNECT = "http://localhost:8443/solr"; + + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); + static { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + } + + private SolrClient solrClient; + + public SolrClient createSolrClient() { + try { + // create an EmbeddedSolrServer for the processor to use + String relPath = getClass().getProtectionDomain().getCodeSource() + .getLocation().getFile() + "../../target"; + + solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, + DEFAULT_SOLR_CORE, relPath); + + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "doc" + i); + Date date = new Date(); + doc.addField("created", DATE_FORMAT.format(date)); + doc.addField("string_single", "single" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".2"); + doc.addField("integer_single", i); + doc.addField("integer_multi", 1); + doc.addField("integer_multi", 2); + doc.addField("integer_multi", 3); + doc.addField("double_single", 0.5 + i); + + solrClient.add(doc); + } + solrClient.commit(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + return solrClient; + } + + private TestRunner createRunnerWithSolrClient(SolrClient solrClient) { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT); + runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE); + + return runner; + } + + private TestRunner createRunnerWithSolrCloudClient(SolrClient solrClient) { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT); + runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE); + + return runner; + } + + private TestRunner createRunner() { + final TestableProcessor proc = new TestableProcessor(null); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_CONNECT); + runner.setProperty(SolrUtils.COLLECTION, DEFAULT_SOLR_CORE); + + return runner; + } + + @Test + public void testRepeatingParams() { + TestRunner runner = createRunner(); + runner.enqueue(new byte[0]); + + runner.setProperty("facet.field.1", "123"); + runner.setProperty("facet.field.2", "other_field"); + runner.setProperty("f.123.facet.prefix", "pre"); + + ProcessContext context = runner.getProcessContext(); + FlowFile flowFile = runner.getProcessSessionFactory().createSession().get(); + + Map solrParams = SolrUtils.getRequestParams(context, flowFile); + + String[] facet_fields = solrParams.get("facet.field"); + assertEquals(2, facet_fields.length); + assertEquals("123", facet_fields[0]); + assertEquals("other_field", facet_fields[1]); + assertEquals("pre", solrParams.get("f.123.facet.prefix")[0]); + } + + @Test + public void testAllFacetCategories() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("facet.field", "integer_multi"); + runner.setProperty("facet.interval", "integer_single"); + runner.setProperty("facet.interval.set.1", "[4,7]"); + runner.setProperty("facet.interval.set.2", "[5,7]"); + runner.setProperty("facet.range", "created"); + runner.setProperty("facet.range.start", "NOW/MINUTE"); + runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE"); + runner.setProperty("facet.range.gap", "+20SECOND"); + runner.setProperty("facet.query.1", "*:*"); + runner.setProperty("facet.query.2", "integer_multi:2"); + runner.setProperty("facet.query.3", "integer_multi:3"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + runner.assertTransferCount(QuerySolr.FACETS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + if (name.equals("facet_queries")) { + assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader)); + } else if (name.equals("facet_fields")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_multi"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30); + reader.endObject(); + } else if (name.equals("facet_ranges")) { + reader.beginObject(); + assertEquals(reader.nextName(), "created"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10); + reader.endObject(); + } else if (name.equals("facet_intervals")) { + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7); + reader.endObject(); + } + } + reader.endObject(); + reader.close(); + solrClient.close(); + } + + private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException { + int checkSum = 0; + reader.beginArray(); + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("count")) { + checkSum += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.endArray(); + return checkSum; + } + + @Test + public void testFacetTrueButNull() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + // Check for empty nestet Objects in JSON + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0))))); + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("facet_queries")) { + reader.beginArray(); + assertFalse(reader.hasNext()); + reader.endArray(); + } else { + reader.beginObject(); + assertFalse(reader.hasNext()); + reader.endObject(); + } + } + reader.endObject(); + + JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader_stats.beginObject(); + assertEquals(reader_stats.nextName(), "stats_fields"); + reader_stats.beginObject(); + assertFalse(reader_stats.hasNext()); + reader_stats.endObject(); + reader_stats.endObject(); + + reader.close(); + reader_stats.close(); + solrClient.close(); + } + + @Test + public void testStats() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("stats", "true"); + runner.setProperty("stats.field", "integer_single"); + + runner.enqueue(new ByteArrayInputStream(new byte[0])); + runner.run(); + + runner.assertTransferCount(QuerySolr.STATS, 1); + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0))))); + reader.beginObject(); + assertEquals(reader.nextName(), "stats_fields"); + reader.beginObject(); + assertEquals(reader.nextName(), "integer_single"); + reader.beginObject(); + while (reader.hasNext()) { + String name = reader.nextName(); + switch (name) { + case "min": assertEquals(reader.nextString(), "0.0"); break; + case "max": assertEquals(reader.nextString(), "9.0"); break; + case "count": assertEquals(reader.nextInt(), 10); break; + case "sum": assertEquals(reader.nextString(), "45.0"); break; + default: reader.skipValue(); break; + } + } + reader.endObject(); + reader.endObject(); + reader.endObject(); + + reader.close(); + solrClient.close(); + } + + @Test + public void testRelationshipRoutings() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + // Set request handler for request failure + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler"); + + // Processor has no input connection and fails + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Processor has an input connection and fails + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + runner.clearTransferState(); + + // Set request handler for successful request + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select"); + + // Processor has no input connection and succeeds + runner.setNonLoopConnection(false); + runner.run(1, false); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + // Processor has an input connection and succeeds + runner.setNonLoopConnection(true); + runner.enqueue(new byte[0]); + runner.run(1, true); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + runner.clearTransferState(); + + solrClient.close(); + } + + @Test + public void testExpressionLanguageForProperties() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}"); + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}"); + runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}"); + + runner.enqueue(new byte[0], new HashMap(){{ + put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + put("handler", "/select"); + put("fields", "id"); + put("sort", "id desc"); + put("start", "1"); + put("rows", "2"); + }}); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc1"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testSingleFilterQuery() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq", "id:(doc2 OR doc3)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc3"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + + @Test + public void testMultipleFilterQueries() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)"); + runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)"); + runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + String expectedXml = "doc2doc3"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + + solrClient.close(); + } + + @Test + public void testStandardResponse() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc"); + + runner.setNonLoopConnection(false); + runner.run(); + runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + + String expectedXml = "doc1doc0"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + + solrClient.close(); + } + + @Test + public void testPreserveOriginalContent() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0"); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + + String content = "test content 123"; + + runner.enqueue(content); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + + String expectedXml = "doc0"; + assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0)))); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(QuerySolr.RESULTS, 5); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.STATS, 0); + runner.assertTransferCount(QuerySolr.FACETS, 0); + + List flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS); + Integer documentCounter = 0; + Integer startParam = 0; + + for (MockFlowFile flowFile : flowFiles) { + Map attributes = flowFile.getAttributes(); + assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString()); + startParam += 2; + + StringBuffer expectedXml = new StringBuffer() + .append("doc") + .append(documentCounter++) + .append("doc") + .append(documentCounter++) + .append(""); + assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile)))); + } + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults2() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + @Test + public void testRetrievalOfFullResults3() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id"); + runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3"); + runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS); + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.setNonLoopConnection(false); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 4); + runner.assertTransferCount(QuerySolr.ORIGINAL, 0); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + + solrClient.close(); + } + + + @Test + public void testRecordResponse() throws IOException, InitializationException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue()); + runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single"); + runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10"); + + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(SolrUtils.RECORD_WRITER, "writer"); + + runner.setNonLoopConnection(false); + + runner.run(1); + runner.assertQueueEmpty(); + runner.assertTransferCount(QuerySolr.RESULTS, 1); + + JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream( + runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0))))); + reader.beginArray(); + int controlScore = 0; + while (reader.hasNext()) { + reader.beginObject(); + while (reader.hasNext()) { + if (reader.nextName().equals("integer_single")) { + controlScore += reader.nextInt(); + } else { + reader.skipValue(); + } + } + reader.endObject(); + } + reader.close(); + solrClient.close(); + + assertEquals(controlScore, 45); + } + + @Test + public void testExceedStartParam() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrClient(solrClient); + + runner.setProperty(QuerySolr.SOLR_PARAM_START, "10001"); + + runner.setNonLoopConnection(false); + + runner.run(); + runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + + assertEquals("10001", flowFile.getAttribute(QuerySolr.ATTRIBUTE_SOLR_START)); + assertEquals(0, runner.getContentAsByteArray(flowFile).length); + + solrClient.close(); + } + + @Test + public void testAttributesFailure() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrCloudClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler"); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0); + + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION); + flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE); + } + + @Test + public void testAttributes() throws IOException { + SolrClient solrClient = createSolrClient(); + TestRunner runner = createRunnerWithSolrCloudClient(solrClient); + + runner.setProperty("facet", "true"); + runner.setProperty("stats", "true"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(QuerySolr.RESULTS, 1); + runner.assertTransferCount(QuerySolr.FACETS, 1); + runner.assertTransferCount(QuerySolr.STATS, 1); + runner.assertTransferCount(QuerySolr.ORIGINAL, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0); + Map attributes = flowFile.getAttributes(); + + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + + assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT)); + assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION)); + + assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS)); + assertEquals(QuerySolr.MIME_TYPE_XML, attributes.get(CoreAttributes.MIME_TYPE.key())); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0); + attributes = flowFile.getAttributes(); + + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + + assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT)); + assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION)); + + assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS)); + assertEquals(QuerySolr.MIME_TYPE_JSON, attributes.get(CoreAttributes.MIME_TYPE.key())); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0); + attributes = flowFile.getAttributes(); + + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_START); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_ROWS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME); + flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + + assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT)); + assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION)); + + assertEquals("q=*:*&qt=/select&start=0&rows=10&stats=true&facet=true", attributes.get(QuerySolr.ATTRIBUTE_SOLR_QUERY)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_STATUS)); + assertEquals("0", attributes.get(QuerySolr.ATTRIBUTE_SOLR_START)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_ROWS)); + assertEquals("10", attributes.get(QuerySolr.ATTRIBUTE_SOLR_NUMBER_RESULTS)); + assertEquals(QuerySolr.MIME_TYPE_JSON, attributes.get(CoreAttributes.MIME_TYPE.key())); + + flowFile = runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0); + attributes = flowFile.getAttributes(); + + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_COLLECTION); + flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_QUERY); + + assertEquals(SOLR_CONNECT, attributes.get(QuerySolr.ATTRIBUTE_SOLR_CONNECT)); + assertEquals(DEFAULT_SOLR_CORE, attributes.get(QuerySolr.ATTRIBUTE_SOLR_COLLECTION)); + + solrClient.close(); + } + + // Override createSolrClient and return the passed in SolrClient + private class TestableProcessor extends QuerySolr { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrClient(ProcessContext context, String solrLocation) { + return solrClient; + } + } +}