mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 23:15:36 +00:00
Adding support for dynamic properties to PutSolrContentStream
This commit is contained in:
parent
bd6159e97f
commit
5b3709fe0c
@ -79,14 +79,6 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor
|
|
||||||
.Builder().name("Request Parameters")
|
|
||||||
.description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2")
|
|
||||||
.required(false)
|
|
||||||
.addValidator(RequestParamsUtil.getValidator())
|
|
||||||
.defaultValue("json.command=false&split=/&f=id:/field1")
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("The original FlowFile")
|
.description("The original FlowFile")
|
||||||
@ -104,10 +96,10 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
|
|
||||||
public static final String COLLECTION_PARAM_NAME = "collection";
|
public static final String COLLECTION_PARAM_NAME = "collection";
|
||||||
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
|
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
|
||||||
|
public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
|
||||||
|
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
private List<PropertyDescriptor> descriptors;
|
private List<PropertyDescriptor> descriptors;
|
||||||
private volatile MultiMapSolrParams requestParams;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
@ -120,7 +112,6 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
descriptors.add(CONTENT_STREAM_PATH);
|
descriptors.add(CONTENT_STREAM_PATH);
|
||||||
descriptors.add(CONTENT_TYPE);
|
descriptors.add(CONTENT_TYPE);
|
||||||
descriptors.add(COMMIT_WITHIN);
|
descriptors.add(COMMIT_WITHIN);
|
||||||
descriptors.add(REQUEST_PARAMS);
|
|
||||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||||
|
|
||||||
final Set<Relationship> relationships = new HashSet<>();
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
@ -151,12 +142,6 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnScheduled
|
|
||||||
public void initializeRequestParams(ProcessContext context) {
|
|
||||||
final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue();
|
|
||||||
this.requestParams = RequestParamsUtil.parse(requestParamsVal);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile flowFile = session.get();
|
||||||
@ -171,6 +156,8 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
|
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
|
||||||
|
|
||||||
|
final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));
|
||||||
|
|
||||||
StopWatch timer = new StopWatch(true);
|
StopWatch timer = new StopWatch(true);
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
@ -250,6 +237,7 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
// get all of the dynamic properties and values into a Map for later adding to the Solr request
|
// get all of the dynamic properties and values into a Map for later adding to the Solr request
|
||||||
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
|
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
|
||||||
final Map<String,String[]> paramsMap = new HashMap<>();
|
final Map<String,String[]> paramsMap = new HashMap<>();
|
||||||
|
final SortedMap<String,String> repeatingParams = new TreeMap<>();
|
||||||
|
|
||||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
final PropertyDescriptor descriptor = entry.getKey();
|
final PropertyDescriptor descriptor = entry.getKey();
|
||||||
@ -258,10 +246,22 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||||||
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
|
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
if (!paramValue.trim().isEmpty()) {
|
if (!paramValue.trim().isEmpty()) {
|
||||||
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
|
if (paramName.matches(REPEATING_PARAM_PATTERN)) {
|
||||||
|
repeatingParams.put(paramName, paramValue);
|
||||||
|
} else {
|
||||||
|
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (final Map.Entry<String,String> entry : repeatingParams.entrySet()) {
|
||||||
|
final String paramName = entry.getKey();
|
||||||
|
final String paramValue = entry.getValue();
|
||||||
|
final int idx = paramName.lastIndexOf(".");
|
||||||
|
MultiMapSolrParams.addParam(paramName.substring(0, idx), paramValue, paramsMap);
|
||||||
|
}
|
||||||
|
|
||||||
return paramsMap;
|
return paramsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,85 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.solr;
|
|
||||||
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
|
||||||
import org.apache.solr.common.params.MultiMapSolrParams;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class RequestParamsUtil {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses a String of request params into a MultiMap.
|
|
||||||
*
|
|
||||||
* @param requestParams
|
|
||||||
* the value of the request params property
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static MultiMapSolrParams parse(final String requestParams) {
|
|
||||||
final Map<String,String[]> paramsMap = new HashMap<>();
|
|
||||||
if (requestParams == null || requestParams.trim().isEmpty()) {
|
|
||||||
return new MultiMapSolrParams(paramsMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String[] params = requestParams.split("[&]");
|
|
||||||
if (params == null || params.length == 0) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Parameters must be in form k1=v1&k2=v2, was" + requestParams);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final String param : params) {
|
|
||||||
final String[] keyVal = param.split("=");
|
|
||||||
if (keyVal.length != 2) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Parameter must be in form key=value, was " + param);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String key = keyVal[0].trim();
|
|
||||||
final String val = keyVal[1].trim();
|
|
||||||
MultiMapSolrParams.addParam(key, val, paramsMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new MultiMapSolrParams(paramsMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a property validator for a request params string.
|
|
||||||
*
|
|
||||||
* @return valid if the input parses successfully, invalid otherwise
|
|
||||||
*/
|
|
||||||
public static Validator getValidator() {
|
|
||||||
return new Validator() {
|
|
||||||
@Override
|
|
||||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
|
||||||
try {
|
|
||||||
RequestParamsUtil.parse(input);
|
|
||||||
return new ValidationResult.Builder().subject(subject).input(input)
|
|
||||||
.explanation("Valid Params").valid(true).build();
|
|
||||||
} catch (final Exception e) {
|
|
||||||
return new ValidationResult.Builder().subject(subject).input(input)
|
|
||||||
.explanation("Invalid Params" + e.getMessage()).valid(false).build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,48 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>PutSolrContentStream</title>
|
||||||
|
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
<h2>Usage Example</h2>
|
||||||
|
<p>
|
||||||
|
This processor streams the contents of a FlowFile to an Apache Solr
|
||||||
|
update handler. Any properties added to this processor by the user are
|
||||||
|
passed to Solr on the update request. If a parameter must be sent multiple
|
||||||
|
times with different values, properties can follow a naming convention:
|
||||||
|
name.number, where name is the parameter name and number is a unique number.
|
||||||
|
Repeating parameters will be sorted by their property name.
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
Example: To specify multiple 'f' parameters for indexing custom json, the following
|
||||||
|
properties can be defined:
|
||||||
|
</p>
|
||||||
|
<ul>
|
||||||
|
<li><strong>split</strong>: /exams</li>
|
||||||
|
<li><strong>f.1</strong>: first:/first</li>
|
||||||
|
<li><strong>f.2</strong>: last:/last</li>
|
||||||
|
<li><strong>f.3</strong>: grade:/grade</li>
|
||||||
|
</ul>
|
||||||
|
<p>
|
||||||
|
This will result in sending the following url to Solr: </br>
|
||||||
|
split=/exams&f=first:/first&f=last:/last&f=grade:/grade
|
||||||
|
</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
@ -1,49 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.solr;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.apache.solr.common.params.MultiMapSolrParams;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
|
|
||||||
public class RequestParamsUtilTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSimpleParse() {
|
|
||||||
MultiMapSolrParams map = RequestParamsUtil.parse("a=1&b=2&c=3");
|
|
||||||
Assert.assertEquals("1", map.get("a"));
|
|
||||||
Assert.assertEquals("2", map.get("b"));
|
|
||||||
Assert.assertEquals("3", map.get("c"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseWithSpaces() {
|
|
||||||
MultiMapSolrParams map = RequestParamsUtil.parse("a = 1 &b= 2& c= 3 ");
|
|
||||||
Assert.assertEquals("1", map.get("a"));
|
|
||||||
Assert.assertEquals("2", map.get("b"));
|
|
||||||
Assert.assertEquals("3", map.get("c"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
|
||||||
public void testMalformedParamsParse() {
|
|
||||||
RequestParamsUtil.parse("a=1&b&c=3");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -83,7 +83,7 @@ public class TestPutSolrContentStream {
|
|||||||
|
|
||||||
final TestRunner runner = createDefaultTestRunner(proc);
|
final TestRunner runner = createDefaultTestRunner(proc);
|
||||||
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
|
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
|
||||||
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
|
runner.setProperty("json.command", "false");
|
||||||
|
|
||||||
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
|
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
|
||||||
runner.enqueue(fileIn);
|
runner.enqueue(fileIn);
|
||||||
@ -105,14 +105,13 @@ public class TestPutSolrContentStream {
|
|||||||
|
|
||||||
final TestRunner runner = createDefaultTestRunner(proc);
|
final TestRunner runner = createDefaultTestRunner(proc);
|
||||||
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
|
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
|
||||||
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
|
runner.setProperty("split", "/exams");
|
||||||
"split=/exams" +
|
runner.setProperty("f.1", "first:/first");
|
||||||
"&f=first:/first" +
|
runner.setProperty("f.2", "last:/last");
|
||||||
"&f=last:/last" +
|
runner.setProperty("f.3", "grade:/grade");
|
||||||
"&f=grade:/grade" +
|
runner.setProperty("f.4", "subject:/exams/subject");
|
||||||
"&f=subject:/exams/subject" +
|
runner.setProperty("f.5", "test:/exams/test");
|
||||||
"&f=test:/exams/test" +
|
runner.setProperty("f.6", "marks:/exams/marks");
|
||||||
"&f=marks:/exams/marks");
|
|
||||||
|
|
||||||
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
|
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
|
||||||
runner.enqueue(fileIn);
|
runner.enqueue(fileIn);
|
||||||
@ -134,8 +133,7 @@ public class TestPutSolrContentStream {
|
|||||||
|
|
||||||
final TestRunner runner = createDefaultTestRunner(proc);
|
final TestRunner runner = createDefaultTestRunner(proc);
|
||||||
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv");
|
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv");
|
||||||
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
|
runner.setProperty("fieldnames", "first,last,grade,subject,test,marks");
|
||||||
"fieldnames=first,last,grade,subject,test,marks");
|
|
||||||
|
|
||||||
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
|
try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) {
|
||||||
runner.enqueue(fileIn);
|
runner.enqueue(fileIn);
|
||||||
@ -222,7 +220,6 @@ public class TestPutSolrContentStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSolrTypeCloudShouldRequireCollection() {
|
public void testSolrTypeCloudShouldRequireCollection() {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
||||||
@ -242,15 +239,6 @@ public class TestPutSolrContentStream {
|
|||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRequestParamsShouldBeInvalid() {
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
|
||||||
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
|
|
||||||
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
|
|
||||||
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS, "a=1&b");
|
|
||||||
runner.assertNotValid();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override the creatrSolrServer method to inject a Mock.
|
* Override the creatrSolrServer method to inject a Mock.
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user