diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 7cef7a5a04..89b510c401 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -79,14 +79,6 @@ public class PutSolrContentStream extends SolrProcessor { .expressionLanguageSupported(true) .build(); - public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor - .Builder().name("Request Parameters") - .description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2") - .required(false) - .addValidator(RequestParamsUtil.getValidator()) - .defaultValue("json.command=false&split=/&f=id:/field1") - .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("The original FlowFile") @@ -104,10 +96,10 @@ public class PutSolrContentStream extends SolrProcessor { public static final String COLLECTION_PARAM_NAME = "collection"; public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; + public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+"; private Set relationships; private List descriptors; - private volatile MultiMapSolrParams requestParams; @Override protected void init(final ProcessorInitializationContext context) { @@ -120,7 +112,6 @@ public class PutSolrContentStream extends SolrProcessor { descriptors.add(CONTENT_STREAM_PATH); descriptors.add(CONTENT_TYPE); descriptors.add(COMMIT_WITHIN); - descriptors.add(REQUEST_PARAMS); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); @@ -151,12 +142,6 @@ public class PutSolrContentStream extends SolrProcessor { .build(); } - @OnScheduled - public void initializeRequestParams(ProcessContext context) { - final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue(); - this.requestParams = RequestParamsUtil.parse(requestParamsVal); - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -171,6 +156,8 @@ public class PutSolrContentStream extends SolrProcessor { final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); + final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); + StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @Override @@ -250,6 +237,7 @@ public class PutSolrContentStream extends SolrProcessor { // get all of the dynamic properties and values into a Map for later adding to the Solr request private Map getRequestParams(ProcessContext context, FlowFile flowFile) { final Map paramsMap = new HashMap<>(); + final SortedMap repeatingParams = new TreeMap<>(); for (final Map.Entry entry : context.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); @@ -258,10 +246,22 @@ public class PutSolrContentStream extends SolrProcessor { final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); if (!paramValue.trim().isEmpty()) { - MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + if (paramName.matches(REPEATING_PARAM_PATTERN)) { + repeatingParams.put(paramName, paramValue); + } else { + MultiMapSolrParams.addParam(paramName, paramValue, paramsMap); + } } } } + + for (final Map.Entry entry : repeatingParams.entrySet()) { + final String paramName = entry.getKey(); + final String paramValue = entry.getValue(); + final int idx = paramName.lastIndexOf("."); + MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap); + } + return paramsMap; } diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java deleted file mode 100644 index 647f04e05d..0000000000 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/RequestParamsUtil.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.nifi.processors.solr; - -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.solr.common.params.MultiMapSolrParams; - -import java.util.HashMap; -import java.util.Map; - -public class RequestParamsUtil { - - /** - * Parses a String of request params into a MultiMap. - * - * @param requestParams - * the value of the request params property - * @return - */ - public static MultiMapSolrParams parse(final String requestParams) { - final Map paramsMap = new HashMap<>(); - if (requestParams == null || requestParams.trim().isEmpty()) { - return new MultiMapSolrParams(paramsMap); - } - - final String[] params = requestParams.split("[&]"); - if (params == null || params.length == 0) { - throw new IllegalStateException( - "Parameters must be in form k1=v1&k2=v2, was" + requestParams); - } - - for (final String param : params) { - final String[] keyVal = param.split("="); - if (keyVal.length != 2) { - throw new IllegalStateException( - "Parameter must be in form key=value, was " + param); - } - - final String key = keyVal[0].trim(); - final String val = keyVal[1].trim(); - MultiMapSolrParams.addParam(key, val, paramsMap); - } - - return new MultiMapSolrParams(paramsMap); - } - - /** - * Creates a property validator for a request params string. - * - * @return valid if the input parses successfully, invalid otherwise - */ - public static Validator getValidator() { - return new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - try { - RequestParamsUtil.parse(input); - return new ValidationResult.Builder().subject(subject).input(input) - .explanation("Valid Params").valid(true).build(); - } catch (final Exception e) { - return new ValidationResult.Builder().subject(subject).input(input) - .explanation("Invalid Params" + e.getMessage()).valid(false).build(); - } - } - }; - } -} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html new file mode 100644 index 0000000000..054cdb6f0e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html @@ -0,0 +1,48 @@ + + + + + + PutSolrContentStream + + + + +

Usage Example

+

+ This processor streams the contents of a FlowFile to an Apache Solr + update handler. Any properties added to this processor by the user are + passed to Solr on the update request. If a parameter must be sent multiple + times with different values, properties can follow a naming convention: + name.number, where name is the parameter name and number is a unique number. + Repeating parameters will be sorted by their property name. +

+

+ Example: To specify multiple 'f' parameters for indexing custom json, the following + properties can be defined: +

+
    +
  • split: /exams
  • +
  • f.1: first:/first
  • +
  • f.2: last:/last
  • +
  • f.3: grade:/grade
  • +
+

+ This will result in sending the following url to Solr:
+ split=/exams&f=first:/first&f=last:/last&f=grade:/grade +

+ + diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java deleted file mode 100644 index 5a1373e4eb..0000000000 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/RequestParamsUtilTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.nifi.processors.solr; - -import org.junit.Assert; -import org.apache.solr.common.params.MultiMapSolrParams; -import org.junit.Test; - - -public class RequestParamsUtilTest { - - @Test - public void testSimpleParse() { - MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3"); - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - } - - @Test - public void testParseWithSpaces() { - MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 "); - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - } - - @Test(expected = IllegalStateException.class) - public void testMalformedParamsParse() { - RequestParamsUtil.parse("a=1&b&c=3"); - } - -} diff --git a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index 98761c4947..141fbb666a 100644 --- a/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -83,7 +83,7 @@ public class TestPutSolrContentStream { final TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); - runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false"); + runner.setProperty("json.command", "false"); try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); @@ -105,14 +105,13 @@ public class TestPutSolrContentStream { final TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs"); - runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, - "split=/exams" + - "&f=first:/first" + - "&f=last:/last" + - "&f=grade:/grade" + - "&f=subject:/exams/subject" + - "&f=test:/exams/test" + - "&f=marks:/exams/marks"); + runner.setProperty("split", "/exams"); + runner.setProperty("f.1", "first:/first"); + runner.setProperty("f.2", "last:/last"); + runner.setProperty("f.3", "grade:/grade"); + runner.setProperty("f.4", "subject:/exams/subject"); + runner.setProperty("f.5", "test:/exams/test"); + runner.setProperty("f.6", "marks:/exams/marks"); try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { runner.enqueue(fileIn); @@ -134,8 +133,7 @@ public class TestPutSolrContentStream { final TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv"); - runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, - "fieldnames=first,last,grade,subject,test,marks"); + runner.setProperty("fieldnames", "first,last,grade,subject,test,marks"); try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); @@ -222,7 +220,6 @@ public class TestPutSolrContentStream { } } - @Test public void testSolrTypeCloudShouldRequireCollection() { final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); @@ -242,15 +239,6 @@ public class TestPutSolrContentStream { runner.assertValid(); } - @Test - public void testRequestParamsShouldBeInvalid() { - final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class); - runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); - runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b"); - runner.assertNotValid(); - } - /** * Override the creatrSolrServer method to inject a Mock. */