diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index ece7dbb21a..fb7b87e407 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -45,7 +45,7 @@ posix - + @@ -166,9 +166,14 @@ nifi-kite-nar nar + + org.apache.nifi + nifi-solr-nar + nar + - - + + 256 512 @@ -176,7 +181,7 @@ 128 10m 10 - + ${project.version} true @@ -204,7 +209,7 @@ 1 5 sec 4 - + org.apache.nifi.controller.repository.FileSystemRepository 10 MB 100 @@ -214,21 +219,21 @@ false false - - + + 30 sec ./lib ./work/nar/ ./work/docs/components - + PBEWITHMD5AND256BITAES-CBC-OPENSSL BC ;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE 9990 - + org.apache.nifi.provenance.PersistentProvenanceRepository ./provenance_repository @@ -243,15 +248,15 @@ 500 MB false 16 - + 100000 - + org.apache.nifi.controller.status.history.VolatileComponentStatusRepository 288 5 mins - + ./lib @@ -260,7 +265,7 @@ ./work/jetty 200 - + @@ -277,12 +282,12 @@ - + 5 sec false 30 sec - 45 sec + 45 sec false @@ -297,7 +302,7 @@ 2 - + false @@ -349,7 +354,7 @@ - + org.codehaus.mojo rpm-maven-plugin diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml new file mode 100644 index 0000000000..aeab88c189 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-nar/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-solr-bundle + 0.1.0-incubating-SNAPSHOT + + + nifi-solr-nar + nar + + + + org.apache.nifi + nifi-solr-processors + 0.1.0-incubating-SNAPSHOT + + + + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml new file mode 100644 index 0000000000..0e05003351 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-solr-bundle + 0.1.0-incubating-SNAPSHOT + + + nifi-solr-processors + jar + + + + org.apache.solr + solr-solrj + ${solr.version} + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + commons-logging + commons-logging + 1.1.3 + test + + + junit + junit + 4.11 + test + + + org.apache.solr + solr-core + ${solr.version} + test + + + + org.apache.lucene + lucene-core + ${solr.version} + test + + + org.apache.lucene + lucene-analyzers-common + ${solr.version} + test + + + org.apache.lucene + lucene-queryparser + ${solr.version} + test + + + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java new file mode 100644 index 0000000000..d230cc178e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Tags({"Apache", "Solr", "Get", "Pull"}) +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +public class GetSolr extends SolrProcessor { + + public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor + .Builder().name("Solr Query") + .description("A query to execute against Solr") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor + .Builder().name("Return Fields") + .description("Comma-separated list of fields names to return") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor + .Builder().name("Sort Clause") + .description("A Solr sort clause, ex: field1 asc, field2 desc") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor + .Builder().name("Date Field") + .description("The name of a date field in Solr used to filter results") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor + .Builder().name("Batch Size") + .description("Number of rows per Solr query") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The results of querying Solr") + .build(); + + static final String FILE_PREFIX = "conf/.getSolr-"; + static final String LAST_END_DATE = "LastEndDate"; + static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + static final String UNINITIALIZED_LAST_END_DATE_VALUE; + + static { + SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L)); + } + + final AtomicReference lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE); + + private Set relationships; + private List descriptors; + private final Lock fileLock = new ReentrantLock(); + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(DEFAULT_COLLECTION); + descriptors.add(SOLR_QUERY); + descriptors.add(RETURN_FIELDS); + descriptors.add(SORT_CLAUSE); + descriptors.add(DATE_FIELD); + descriptors.add(BATCH_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + } + + @OnShutdown + public void onShutdown() { + writeLastEndDate(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + + final FlowFile incomingFlowFile = session.get(); + if (incomingFlowFile != null) { + session.transfer(incomingFlowFile, REL_SUCCESS); + logger.warn("found FlowFile {} in input queue; transferring to success", + new Object[]{incomingFlowFile}); + } + + readLastEndDate(); + + final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + final String currDate = sdf.format(new Date()); + + final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); + + final String query = context.getProperty(SOLR_QUERY).getValue(); + final SolrQuery solrQuery = new SolrQuery(query); + solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); + + // if initialized then apply a filter to restrict results from the last end time til now + if (initialized) { + StringBuilder filterQuery = new StringBuilder(); + filterQuery.append(context.getProperty(DATE_FIELD).getValue()) + .append(":{").append(lastEndDatedRef.get()).append(" TO ") + .append(currDate).append("]"); + solrQuery.addFilterQuery(filterQuery.toString()); + logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); + } + + final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); + if (returnFields != null && !returnFields.trim().isEmpty()) { + for (String returnField : returnFields.trim().split("[,]")) { + solrQuery.addField(returnField.trim()); + } + } + + final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); + if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { + for (String sortClause : fullSortClause.split("[,]")) { + String[] sortParts = sortClause.trim().split("[ ]"); + solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); + } + } + + try { + // run the initial query and send out the first page of results + final StopWatch stopWatch = new StopWatch(true); + QueryResponse response = getSolrServer().query(solrQuery); + stopWatch.stop(); + + long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + + final SolrDocumentList documentList = response.getResults(); + logger.info("Retrieved {} results from Solr for {} in {} ms", + new Object[] {documentList.getNumFound(), query, duration}); + + if (documentList != null && documentList.getNumFound() > 0) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); + session.transfer(flowFile, REL_SUCCESS); + + StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); + if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { + transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue()); + } + + session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + + // if initialized then page through the results and send out each page + if (initialized) { + int endRow = response.getResults().size(); + long totalResults = response.getResults().getNumFound(); + + while (endRow < totalResults) { + solrQuery.setStart(endRow); + + stopWatch.start(); + response = getSolrServer().query(solrQuery); + stopWatch.stop(); + + duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration}); + + flowFile = session.create(); + flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + endRow += response.getResults().size(); + } + } + } + + lastEndDatedRef.set(currDate); + writeLastEndDate(); + } catch (SolrServerException e) { + context.yield(); + session.rollback(); + logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e); + throw new ProcessException(e); + } catch (final Throwable t) { + context.yield(); + session.rollback(); + logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t); + throw t; + } + } + + private void readLastEndDate() { + fileLock.lock(); + File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); + try (FileInputStream fis = new FileInputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.load(fis); + lastEndDatedRef.set(props.getProperty(LAST_END_DATE)); + } catch (IOException swallow) { + } finally { + fileLock.unlock(); + } + } + + private void writeLastEndDate() { + fileLock.lock(); + File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.setProperty(LAST_END_DATE, lastEndDatedRef.get()); + props.store(fos, "GetSolr LastEndDate value"); + } catch (IOException e) { + getLogger().error("Failed to persist LastEndDate due to " + e, e); + } finally { + fileLock.unlock(); + } + } + + /** + * Writes each SolrDocument in XML format to the OutputStream. + */ + private class QueryResponseOutputStreamCallback implements OutputStreamCallback { + private QueryResponse response; + + public QueryResponseOutputStreamCallback(QueryResponse response) { + this.response = response; + } + + @Override + public void process(OutputStream out) throws IOException { + for (SolrDocument doc : response.getResults()) { + String xml = ClientUtils.toXML(ClientUtils.toSolrInputDocument(doc)); + IOUtils.write(xml, out); + } + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java new file mode 100644 index 0000000000..704d8a21e1 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.apache.nifi.util.StopWatch; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.util.ContentStreamBase; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.TimeUnit; + +@Tags({"Apache", "Solr", "Put", "Send"}) +@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") +public class PutSolrContentStream extends SolrProcessor { + + public static final PropertyDescriptor CONTENT_STREAM_URL = new PropertyDescriptor + .Builder().name("Content Stream URL") + .description("The URL in Solr to post the ContentStream") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("/update/json/docs") + .build(); + + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor + .Builder().name("Content-Type") + .description("Content-Type being sent to Solr") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("application/json") + .build(); + + public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor + .Builder().name("Request Parameters") + .description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2") + .required(false) + .addValidator(RequestParamsUtil.getValidator()) + .defaultValue("json.command=false&split=/&f=id:/field1") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed for any reason other than Solr being unreachable") + .build(); + + public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder() + .name("connection_failure") + .description("FlowFiles that failed because Solr is unreachable") + .build(); + + /** + * The name of a FlowFile attribute used for specifying a Solr collection. + */ + public static final String SOLR_COLLECTION_ATTR = "solr.collection"; + + private Set relationships; + private List descriptors; + private volatile MultiMapSolrParams requestParams; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(DEFAULT_COLLECTION); + descriptors.add(CONTENT_STREAM_URL); + descriptors.add(CONTENT_TYPE); + descriptors.add(REQUEST_PARAMS); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + relationships.add(REL_CONNECTION_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + protected void additionalOnScheduled(ProcessContext context) { + final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue(); + this.requestParams = RequestParamsUtil.parse(requestParamsVal); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final ObjectHolder error = new ObjectHolder<>(null); + final ObjectHolder connectionError = new ObjectHolder<>(null); + final ObjectHolder collectionUsed = new ObjectHolder<>(null); + + final String collectionAttrVal = flowFile.getAttribute(SOLR_COLLECTION_ATTR); + final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); + + StopWatch timer = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + ContentStreamUpdateRequest request = new ContentStreamUpdateRequest( + context.getProperty(CONTENT_STREAM_URL).getValue()); + request.setParams(new ModifiableSolrParams()); + + // add the extra params, don't use 'set' in case of repeating params + Iterator paramNames = requestParams.getParameterNamesIterator(); + while (paramNames.hasNext()) { + String paramName = paramNames.next(); + for (String paramValue : requestParams.getParams(paramName)) { + request.getParams().add(paramName, paramValue); + } + } + + // send the request to the specified collection, or to the default collection + if (isSolrCloud) { + String collection = collectionAttrVal; + if (StringUtils.isBlank(collection)) { + collection = context.getProperty(DEFAULT_COLLECTION).getValue(); + } + request.setParam("collection", collection); + collectionUsed.set(collection); + } + + try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) { + // add the FlowFile's content on the UpdateRequest + request.addContentStream(new ContentStreamBase() { + @Override + public InputStream getStream() throws IOException { + return bufferedIn; + } + + @Override + public String getContentType() { + return context.getProperty(CONTENT_TYPE).getValue(); + } + }); + + UpdateResponse response = request.process(getSolrServer()); + getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()}); + } catch (SolrException e) { + error.set(e); + } catch (SolrServerException e) { + connectionError.set(e); + } + } + }); + timer.stop(); + + if (error.get() != null) { + getLogger().error("Failed to send {} to Solr due to {} with status code {}; routing to failure", + new Object[]{flowFile, error.get(), error.get().code()}); + session.transfer(flowFile, REL_FAILURE); + } else if (connectionError.get() != null) { + getLogger().error("Failed to send {} to Solr due to {}; routing to connection_failure", + new Object[]{flowFile, connectionError.get()}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_CONNECTION_FAILURE); + } else { + StringBuilder transitUri = new StringBuilder("solr://"); + transitUri.append(context.getProperty(SOLR_LOCATION).getValue()); + if (isSolrCloud) { + transitUri.append(":").append(collectionUsed.get()); + } + + final long duration = timer.getDuration(TimeUnit.MILLISECONDS); + session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true); + getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration}); + session.transfer(flowFile, REL_ORIGINAL); + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java new file mode 100644 index 0000000000..647f04e05d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.util.HashMap; +import java.util.Map; + +public class RequestParamsUtil { + + /** + * Parses a String of request params into a MultiMap. + * + * @param requestParams + * the value of the request params property + * @return + */ + public static MultiMapSolrParams parse(final String requestParams) { + final Map paramsMap = new HashMap<>(); + if (requestParams == null || requestParams.trim().isEmpty()) { + return new MultiMapSolrParams(paramsMap); + } + + final String[] params = requestParams.split("[&]"); + if (params == null || params.length == 0) { + throw new IllegalStateException( + "Parameters must be in form k1=v1&k2=v2, was" + requestParams); + } + + for (final String param : params) { + final String[] keyVal = param.split("="); + if (keyVal.length != 2) { + throw new IllegalStateException( + "Parameter must be in form key=value, was " + param); + } + + final String key = keyVal[0].trim(); + final String val = keyVal[1].trim(); + MultiMapSolrParams.addParam(key, val, paramsMap); + } + + return new MultiMapSolrParams(paramsMap); + } + + /** + * Creates a property validator for a request params string. + * + * @return valid if the input parses successfully, invalid otherwise + */ + public static Validator getValidator() { + return new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + try { + RequestParamsUtil.parse(input); + return new ValidationResult.Builder().subject(subject).input(input) + .explanation("Valid Params").valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input) + .explanation("Invalid Params" + e.getMessage()).valid(false).build(); + } + } + }; + } +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java new file mode 100644 index 0000000000..f286a1a182 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A base class for processors that interact with Apache Solr. + * + */ +public abstract class SolrProcessor extends AbstractProcessor { + + public static final AllowableValue SOLR_TYPE_CLOUD = new AllowableValue( + "Cloud", "Cloud", "A SolrCloud instance."); + + public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( + "Standard", "Standard", "A stand-alone Solr instance."); + + public static final PropertyDescriptor SOLR_TYPE = new PropertyDescriptor + .Builder().name("Solr Type") + .description("The type of Solr instance, Cloud or Standard.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD) + .defaultValue(SOLR_TYPE_STANDARD.getValue()) + .build(); + + public static final PropertyDescriptor SOLR_LOCATION = new PropertyDescriptor + .Builder().name("Solr Location") + .description("The Solr url for a Solr Type of Standard, " + + "or the ZooKeeper hosts for a Solr Type of Cloud.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DEFAULT_COLLECTION = new PropertyDescriptor + .Builder().name("Default Collection") + .description("The Solr collection name, only used with a Solr Type of Cloud") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private volatile SolrClient solrServer; + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + this.solrServer = createSolrServer(context); + additionalOnScheduled(context); + } + + /** + * Create a SolrServer based on the type of Solr specified. + * + * @param context + * The context + * @return an HttpSolrServer or CloudSolrServer + */ + protected SolrClient createSolrServer(final ProcessContext context) { + if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) { + return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue()); + } else { + CloudSolrClient cloudSolrServer = new CloudSolrClient( + context.getProperty(SOLR_LOCATION).getValue()); + cloudSolrServer.setDefaultCollection( + context.getProperty(DEFAULT_COLLECTION).getValue()); + return cloudSolrServer; + } + } + + /** + * Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the + * {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} method + * + * @return + */ + protected final SolrClient getSolrServer() { + return solrServer; + } + + /** + * Allows additional action to be taken during scheduling of processor. + * + * @param context + * The context + */ + protected void additionalOnScheduled(final ProcessContext context) { + + } + + @Override + protected final Collection customValidate(ValidationContext context) { + final List problems = new ArrayList<>(); + + if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { + final String collection = context.getProperty(DEFAULT_COLLECTION).getValue(); + if (collection == null || collection.trim().isEmpty()) { + problems.add(new ValidationResult.Builder() + .subject(DEFAULT_COLLECTION.getName()) + .input(collection).valid(false) + .explanation("A collection must specified for Solr Type of Cloud") + .build()); + } + } + + Collection otherProblems = this.additionalCustomValidation(context); + if (otherProblems != null) { + problems.addAll(otherProblems); + } + + return problems; + } + + /** + * Allows additional custom validation to be done. This will be called from + * the parent's customValidation method. + * + * @param context + * The context + * @return Validation results indicating problems + */ + protected Collection additionalCustomValidation(ValidationContext context) { + return new ArrayList<>(); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..657d0e8433 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.solr.PutSolrContentStream +org.apache.nifi.processors.solr.GetSolr diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java new file mode 100644 index 0000000000..555c875910 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/EmbeddedSolrServerFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.commons.io.FileUtils; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * Helper to create EmbeddedSolrServer instances for testing. + */ +public class EmbeddedSolrServerFactory { + + public static final String DEFAULT_SOLR_HOME = "src/test/resources/solr"; + public static final String DEFAULT_CORE_HOME = "src/test/resources/"; + public static final String DEFAULT_DATA_DIR = "target"; + + /** + * Use the defaults to create the core. + * + * @param coreName + * @return + */ + public static SolrClient create(String coreName) throws IOException { + return create(DEFAULT_SOLR_HOME, DEFAULT_CORE_HOME, + coreName, DEFAULT_DATA_DIR); + } + + /** + * + * @param solrHome + * path to directory where solr.xml lives + * @param coreName + * the name of the core to load + * @param dataDir + * the data dir for the core + * + * @return an EmbeddedSolrServer for the given core + */ + public static SolrClient create(String solrHome, String coreHome, String coreName, String dataDir) + throws IOException { + + Properties props = new Properties(); + if (dataDir != null) { + File coreDataDir = new File(dataDir + "/" + coreName); + if (coreDataDir.exists()) { + FileUtils.deleteDirectory(coreDataDir); + } + props.setProperty("dataDir", dataDir + "/" + coreName); + } + + CoreContainer coreContainer = new CoreContainer(solrHome); + coreContainer.load(); + + CoreDescriptor descriptor = new CoreDescriptor(coreContainer, coreName, + new File(coreHome, coreName).getAbsolutePath(), props); + + coreContainer.create(descriptor); + return new EmbeddedSolrServer(coreContainer, coreName); + } +} + + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java new file mode 100644 index 0000000000..5a1373e4eb --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.junit.Assert; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.junit.Test; + + +public class RequestParamsUtilTest { + + @Test + public void testSimpleParse() { + MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3"); + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + } + + @Test + public void testParseWithSpaces() { + MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 "); + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + } + + @Test(expected = IllegalStateException.class) + public void testMalformedParamsParse() { + RequestParamsUtil.parse("a=1&b&c=3"); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java new file mode 100644 index 0000000000..52eb06b3e1 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.junit.Assert.assertTrue; + +public class TestGetSolr { + + static final String DEFAULT_SOLR_CORE = "testCollection"; + + private SolrClient solrClient; + + @Before + public void setup() { + // create the conf dir if it doesn't exist + File confDir = new File("conf"); + if (!confDir.exists()) { + confDir.mkdir(); + } + + try { + // create an EmbeddedSolrServer for the processor to use + String relPath = getClass().getProtectionDomain().getCodeSource() + .getLocation().getFile() + "../../target"; + + solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, + EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, DEFAULT_SOLR_CORE, relPath); + + // create some test documents + SolrInputDocument doc1 = new SolrInputDocument(); + doc1.addField("first", "bob"); + doc1.addField("last", "smith"); + doc1.addField("created", new Date()); + + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("first", "alice"); + doc2.addField("last", "smith"); + doc2.addField("created", new Date()); + + SolrInputDocument doc3 = new SolrInputDocument(); + doc3.addField("first", "mike"); + doc3.addField("last", "smith"); + doc3.addField("created", new Date()); + + SolrInputDocument doc4 = new SolrInputDocument(); + doc4.addField("first", "john"); + doc4.addField("last", "smith"); + doc4.addField("created", new Date()); + + SolrInputDocument doc5 = new SolrInputDocument(); + doc5.addField("first", "joan"); + doc5.addField("last", "smith"); + doc5.addField("created", new Date()); + + // add the test data to the index + solrClient.add(doc1); + solrClient.add(doc2); + solrClient.add(doc3); + solrClient.add(doc4); + solrClient.add(doc5); + solrClient.commit(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @After + public void teardown() { + File confDir = new File("conf"); + assertTrue(confDir.exists()); + File[] files = confDir.listFiles(); + assertTrue(files.length > 0); + for (File file : files) { + assertTrue("Failed to delete " + file.getName(), file.delete()); + } + assertTrue(confDir.delete()); + + try { + solrClient.shutdown(); + } catch (Exception e) { + } + } + + @Test + public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + final TestRunner runner = TestRunners.newTestRunner(proc); + + // setup a lastEndDate file to simulate picking up from a previous end date + SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + Calendar cal = new GregorianCalendar(); + cal.add(Calendar.MINUTE, -30); + final String lastEndDate = sdf.format(cal.getTime()); + + File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { + Properties props = new Properties(); + props.setProperty(GetSolr.LAST_END_DATE, lastEndDate); + props.store(fos, "GetSolr LastEndDate value"); + } catch (IOException e) { + Assert.fail("Failed to setup last end date value: " + e.getMessage()); + } + + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); + runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "2"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3); + } + + @Test + public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); + runner.setProperty(GetSolr.RETURN_FIELDS, "created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); + } + + @Test + public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { + final TestableProcessor proc = new TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz"); + runner.setProperty(GetSolr.RETURN_FIELDS, "created"); + runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "10"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); + } + + + // Override createSolrServer and return the passed in SolrClient + private class TestableProcessor extends GetSolr { + private SolrClient solrClient; + + public TestableProcessor(SolrClient solrClient) { + this.solrClient = solrClient; + } + @Override + protected SolrClient createSolrServer(ProcessContext context) { + return solrClient; + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java new file mode 100644 index 0000000000..88009788ba --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.*; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.mockito.Mockito.*; + +/** + * Test for PutSolr processor. + */ +public class TestPutSolrContentStream { + + static final String DEFAULT_SOLR_CORE = "testCollection"; + + static final String CUSTOM_JSON_SINGLE_DOC_FILE = "src/test/resources/testdata/test-custom-json-single-doc.json"; + static final String SOLR_JSON_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-solr-json-multiple-docs.json"; + static final String CSV_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-csv-multiple-docs.csv"; + static final String XML_MULTIPLE_DOCS_FILE = "src/test/resources/testdata/test-xml-multiple-docs.xml"; + + static final SolrDocument expectedDoc1 = new SolrDocument(); + static { + expectedDoc1.addField("first", "John"); + expectedDoc1.addField("last", "Doe"); + expectedDoc1.addField("grade", 8); + expectedDoc1.addField("subject", "Math"); + expectedDoc1.addField("test", "term1"); + expectedDoc1.addField("marks", 90); + } + + static final SolrDocument expectedDoc2 = new SolrDocument(); + static { + expectedDoc2.addField("first", "John"); + expectedDoc2.addField("last", "Doe"); + expectedDoc2.addField("grade", 8); + expectedDoc2.addField("subject", "Biology"); + expectedDoc2.addField("test", "term1"); + expectedDoc2.addField("marks", 86); + } + + /** + * Creates a base TestRunner with Solr Type of standard and json update path. + */ + private static TestRunner createDefaultTestRunner(PutSolrContentStream processor) { + TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + return runner; + } + + @Test + public void testUpdateWithSolrJson() throws IOException, SolrServerException { + final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs"); + runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false"); + + try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + + verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + } + } + + @Test + public void testUpdateWithCustomJson() throws IOException, SolrServerException { + final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs"); + runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, + "split=/exams" + + "&f=first:/first" + + "&f=last:/last" + + "&f=grade:/grade" + + "&f=subject:/exams/subject" + + "&f=test:/exams/test" + + "&f=marks:/exams/marks"); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + + verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + } + } + + @Test + public void testUpdateWithCsv() throws IOException, SolrServerException { + final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/csv"); + runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, + "fieldnames=first,last,grade,subject,test,marks"); + + try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + + verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + } + } + + @Test + public void testUpdateWithXml() throws IOException, SolrServerException { + final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE); + + final TestRunner runner = createDefaultTestRunner(proc); + runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update"); + runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml"); + + try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) { + runner.enqueue(fileIn); + + runner.run(); + runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0); + runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1); + + verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2)); + } finally { + try { proc.getSolrServer().shutdown(); } catch (Exception e) { } + } + } + + @Test + public void testSolrServerExceptionShouldRouteToConnectionFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrServerException("Error communicating with Solr"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1); + verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + } + } + + @Test + public void testSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error"); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + } + } + + @Test + public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException { + final Throwable throwable = new HttpSolrClient.RemoteSolrException( + "host", 401, "error", new NumberFormatException()); + final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable); + + final TestRunner runner = createDefaultTestRunner(proc); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1); + verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class)); + } + } + + + @Test + public void testSolrTypeCloudShouldRequireCollection() { + final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.assertNotValid(); + + runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, "someCollection1"); + runner.assertValid(); + } + + @Test + public void testSolrTypeStandardShouldNotRequireCollection() { + final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.assertValid(); + } + + @Test + public void testRequestParamsShouldBeInvalid() { + final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b"); + runner.assertNotValid(); + } + + /** + * Override the creatrSolrServer method to inject a Mock. + */ + private class ExceptionThrowingProcessor extends PutSolrContentStream { + + private SolrClient mockSolrServer; + private Throwable throwable; + + public ExceptionThrowingProcessor(Throwable throwable) { + this.throwable = throwable; + } + + @Override + protected SolrClient createSolrServer(ProcessContext context) { + mockSolrServer = Mockito.mock(SolrClient.class); + try { + when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable); + } catch (SolrServerException e) { + Assert.fail(e.getMessage()); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + return mockSolrServer; + } + + } + + /** + * Override the createSolrServer method and create and EmbeddedSolrServer. + */ + private class EmbeddedSolrServerProcessor extends PutSolrContentStream { + + private String coreName; + private SolrClient embeddedSolrServer; + + public EmbeddedSolrServerProcessor(String coreName) { + this.coreName = coreName; + } + + @Override + protected SolrClient createSolrServer(ProcessContext context) { + try { + String relPath = getClass().getProtectionDomain() + .getCodeSource().getLocation().getFile() + + "../../target"; + + embeddedSolrServer = EmbeddedSolrServerFactory.create( + EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, + EmbeddedSolrServerFactory.DEFAULT_CORE_HOME, + coreName, relPath); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + return embeddedSolrServer; + } + + } + + /** + * Verify that given SolrServer contains the expected SolrDocuments. + */ + private static void verifySolrDocuments(SolrClient solrServer, Collection expectedDocuments) + throws IOException, SolrServerException { + + solrServer.commit(); + + SolrQuery query = new SolrQuery("*:*"); + QueryResponse qResponse = solrServer.query(query); + Assert.assertEquals(expectedDocuments.size(), qResponse.getResults().getNumFound()); + + // verify documents have expected fields and values + for (SolrDocument expectedDoc : expectedDocuments) { + boolean found = false; + for (SolrDocument solrDocument : qResponse.getResults()) { + boolean foundAllFields = true; + for (String expectedField : expectedDoc.getFieldNames()) { + Object expectedVal = expectedDoc.getFirstValue(expectedField); + Object actualVal = solrDocument.getFirstValue(expectedField); + foundAllFields = expectedVal.equals(actualVal); + } + + if (foundAllFields) { + found = true; + break; + } + } + Assert.assertTrue("Could not find " + expectedDoc, found); + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000000..4a3bdd369e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/log4j.properties @@ -0,0 +1,14 @@ +# Logging level +solr.log=logs/ +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n + +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.apache.hadoop=WARN + +# set to INFO to enable infostream log messages +log4j.logger.org.apache.solr.update.LoggingInfoStream=OFF \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml new file mode 100644 index 0000000000..86fb3dbedf --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/solr.xml @@ -0,0 +1,18 @@ + + + + + ${host:} + ${jetty.port:8983} + ${hostContext:solr} + ${zkClientTimeout:30000} + ${genericCoreNodeNames:true} + + + + ${socketTimeout:0} + ${connTimeout:0} + + + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json new file mode 100644 index 0000000000..e7ada3f6e4 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/_rest_managed.json @@ -0,0 +1,3 @@ +{ + "initArgs":{}, + "managedList":[]} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt new file mode 100644 index 0000000000..2c164c0b2a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/lang/stopwords_en.txt @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# a couple of test stopwords to test that the words are really being +# configured from this file: +stopworda +stopwordb + +# Standard english stop words taken from Lucene's StopAnalyzer +a +an +and +are +as +at +be +but +by +for +if +in +into +is +it +no +not +of +on +or +such +that +the +their +then +there +these +they +this +to +was +will +with diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt new file mode 100644 index 0000000000..1dfc0abecb --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/protwords.txt @@ -0,0 +1,21 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#----------------------------------------------------------------------- +# Use a protected word file to protect against the stemmer reducing two +# unrelated words to the same base word. + +# Some non-words that normally won't be encountered, +# just to test that they won't be stemmed. +dontstems +zwhacky + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml new file mode 100644 index 0000000000..d2f7e8f5ba --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/schema.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml new file mode 100644 index 0000000000..148a2db3bc --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/solrconfig.xml @@ -0,0 +1,20 @@ + + + 5.0.0 + + ${solr.data.dir:} + + + + single + + + + + + + + + + \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt new file mode 100644 index 0000000000..7f72128303 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/conf/synonyms.txt @@ -0,0 +1,29 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#----------------------------------------------------------------------- +#some test synonym mappings unlikely to appear in real input text +aaafoo => aaabar +bbbfoo => bbbfoo bbbbar +cccfoo => cccbar cccbaz +fooaaa,baraaa,bazaaa + +# Some synonym groups specific to this example +GB,gib,gigabyte,gigabytes +MB,mib,megabyte,megabytes +Television, Televisions, TV, TVs +#notice we use "gib" instead of "GiB" so any WordDelimiterFilter coming +#after us won't split it into two words. + +# Synonym mappings can be used for spelling correction too +pixima => pixma + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties new file mode 100644 index 0000000000..4e16ece8c1 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testCollection/core.properties @@ -0,0 +1 @@ +name=jsonCollection \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv new file mode 100644 index 0000000000..5657a89ef9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv @@ -0,0 +1,2 @@ +John,Doe,8,Math,term1,90 +John,Doe,8,Biology,term1,86 diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json new file mode 100644 index 0000000000..5cca8078ac --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json @@ -0,0 +1,15 @@ +{ + "first": "John", + "last": "Doe", + "grade": 8, + "exams": [ + { + "subject": "Math", + "test" : "term1", + "marks":90}, + { + "subject": "Biology", + "test" : "term1", + "marks":86} + ] +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json new file mode 100644 index 0000000000..cea939bc2d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json @@ -0,0 +1,18 @@ +[ +{ + "first": "John", + "last": "Doe", + "grade": 8, + "subject": "Math", + "test" : "term1", + "marks": 90 +}, +{ + "first": "John", + "last": "Doe", + "grade": 8, + "subject": "Biology", + "test" : "term1", + "marks": 86 +} +] \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml new file mode 100644 index 0000000000..4622e0dd78 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml @@ -0,0 +1,18 @@ + + + John + Doe + 8 + Math + term1 + 90 + + + John + Doe + 8 + Biology + term1 + 86 + + \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml new file mode 100644 index 0000000000..d716b88060 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 0.1.0-incubating-SNAPSHOT + + + nifi-solr-bundle + pom + + A bundle of processors that can store and retrieve data from Apache Solr + + + 5.0.0 + + + + nifi-solr-processors + nifi-solr-nar + + + diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml index e7c122d5e8..8acf9b96e4 100644 --- a/nifi/nifi-nar-bundles/pom.xml +++ b/nifi/nifi-nar-bundles/pom.xml @@ -35,6 +35,7 @@ nifi-update-attribute-bundle nifi-kafka-bundle nifi-kite-bundle + nifi-solr-bundle diff --git a/nifi/pom.xml b/nifi/pom.xml index e7d8f5ba5e..0c5ca9aceb 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -170,12 +170,12 @@ org.antlr antlr-runtime 3.5.2 - + commons-codec commons-codec 1.10 - + commons-net commons-net @@ -220,7 +220,7 @@ net.sf.saxon Saxon-HE 9.6.0-4 - + stax stax-api @@ -231,10 +231,10 @@ quartz 2.2.1 - + --> c3p0 c3p0 @@ -303,7 +303,7 @@ spring-core ${spring.version} - commons-logging @@ -470,7 +470,7 @@ com.jcraft jzlib 1.1.3 - + joda-time joda-time @@ -516,7 +516,7 @@ hadoop-hdfs ${hadoop.version} - + org.apache.hadoop hadoop-yarn-api @@ -542,7 +542,7 @@ avro 1.7.6 - + com.sun.jersey jersey-server @@ -562,7 +562,7 @@ com.sun.jersey jersey-servlet ${jersey.version} - + com.sun.jersey jersey-client @@ -627,7 +627,7 @@ jetty-jsp-jdt 2.3.3 provided - + com.google.guava guava @@ -781,6 +781,12 @@ 0.1.0-incubating-SNAPSHOT nar + + org.apache.nifi + nifi-solr-nar + 0.1.0-incubating-SNAPSHOT + nar + org.apache.nifi nifi-kafka-nar @@ -802,7 +808,7 @@ org.apache.nifi nifi-security-utils 0.1.0-incubating-SNAPSHOT - + org.apache.nifi nifi-logging-utils @@ -852,7 +858,7 @@ slf4j-simple test - + @@ -999,7 +1005,7 @@ ${maven.min-version} - +