Addressing feedback from pull request, adding commitWithin parameter to PutSolrContentStream since all update handlers support it

This commit is contained in:
bbende 2015-03-26 22:27:09 -04:00
parent 041f543283
commit 23989609ae
6 changed files with 115 additions and 92 deletions

View File

@ -219,7 +219,7 @@
<nifi.content.repository.archive.enabled>false</nifi.content.repository.archive.enabled>
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
<nifi.content.viewer.url />
<nifi.restore.directory />
<nifi.ui.banner.text />
<nifi.ui.autorefresh.interval>30 sec</nifi.ui.autorefresh.interval>

View File

@ -117,7 +117,7 @@ public class GetSolr extends SolrProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(DEFAULT_COLLECTION);
descriptors.add(COLLECTION);
descriptors.add(SOLR_QUERY);
descriptors.add(RETURN_FIELDS);
descriptors.add(SORT_CLAUSE);
@ -153,14 +153,6 @@ public class GetSolr extends SolrProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
final FlowFile incomingFlowFile = session.get();
if (incomingFlowFile != null) {
session.transfer(incomingFlowFile, REL_SUCCESS);
logger.warn("found FlowFile {} in input queue; transferring to success",
new Object[]{incomingFlowFile});
}
readLastEndDate();
final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
@ -201,7 +193,7 @@ public class GetSolr extends SolrProcessor {
try {
// run the initial query and send out the first page of results
final StopWatch stopWatch = new StopWatch(true);
QueryResponse response = getSolrServer().query(solrQuery);
QueryResponse response = getSolrClient().query(solrQuery);
stopWatch.stop();
long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
@ -218,7 +210,7 @@ public class GetSolr extends SolrProcessor {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
transitUri.append("/").append(context.getProperty(DEFAULT_COLLECTION).getValue());
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
}
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
@ -232,7 +224,7 @@ public class GetSolr extends SolrProcessor {
solrQuery.setStart(endRow);
stopWatch.start();
response = getSolrServer().query(solrQuery);
response = getSolrClient().query(solrQuery);
stopWatch.stop();
duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);

View File

@ -18,9 +18,10 @@
*/
package org.apache.nifi.processors.solr;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@ -48,13 +49,16 @@ import java.util.concurrent.TimeUnit;
@Tags({"Apache", "Solr", "Put", "Send"})
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value",
description="These parameters will be passed to Solr on the request")
public class PutSolrContentStream extends SolrProcessor {
public static final PropertyDescriptor CONTENT_STREAM_URL = new PropertyDescriptor
.Builder().name("Content Stream URL")
.description("The URL in Solr to post the ContentStream")
public static final PropertyDescriptor CONTENT_STREAM_PATH = new PropertyDescriptor
.Builder().name("Content Stream Path")
.description("The path in Solr to post the ContentStream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("/update/json/docs")
.build();
@ -63,9 +67,18 @@ public class PutSolrContentStream extends SolrProcessor {
.description("Content-Type being sent to Solr")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("application/json")
.build();
public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor
.Builder().name("Commit Within")
.description("The number of milliseconds before the given update is committed")
.required(false)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor REQUEST_PARAMS = new PropertyDescriptor
.Builder().name("Request Parameters")
.description("Additional parameters to pass to Solr on each request, i.e. key1=val1&key2=val2")
@ -74,8 +87,8 @@ public class PutSolrContentStream extends SolrProcessor {
.defaultValue("json.command=false&split=/&f=id:/field1")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The original FlowFile")
.build();
@ -89,10 +102,8 @@ public class PutSolrContentStream extends SolrProcessor {
.description("FlowFiles that failed because Solr is unreachable")
.build();
/**
* The name of a FlowFile attribute used for specifying a Solr collection.
*/
public static final String SOLR_COLLECTION_ATTR = "solr.collection";
public static final String COLLECTION_PARAM_NAME = "collection";
public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@ -105,14 +116,15 @@ public class PutSolrContentStream extends SolrProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SOLR_TYPE);
descriptors.add(SOLR_LOCATION);
descriptors.add(DEFAULT_COLLECTION);
descriptors.add(CONTENT_STREAM_URL);
descriptors.add(COLLECTION);
descriptors.add(CONTENT_STREAM_PATH);
descriptors.add(CONTENT_TYPE);
descriptors.add(COMMIT_WITHIN);
descriptors.add(REQUEST_PARAMS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_CONNECTION_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
@ -129,7 +141,18 @@ public class PutSolrContentStream extends SolrProcessor {
}
@Override
protected void additionalOnScheduled(ProcessContext context) {
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(true)
.build();
}
@OnScheduled
public void initializeRequestParams(ProcessContext context) {
final String requestParamsVal = context.getProperty(REQUEST_PARAMS).getValue();
this.requestParams = RequestParamsUtil.parse(requestParamsVal);
}
@ -143,17 +166,19 @@ public class PutSolrContentStream extends SolrProcessor {
final ObjectHolder<SolrException> error = new ObjectHolder<>(null);
final ObjectHolder<SolrServerException> connectionError = new ObjectHolder<>(null);
final ObjectHolder<String> collectionUsed = new ObjectHolder<>(null);
final String collectionAttrVal = flowFile.getAttribute(SOLR_COLLECTION_ATTR);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(
context.getProperty(CONTENT_STREAM_URL).getValue());
final String contentStreamPath = context.getProperty(CONTENT_STREAM_PATH)
.evaluateAttributeExpressions().getValue();
ContentStreamUpdateRequest request = new ContentStreamUpdateRequest(contentStreamPath);
request.setParams(new ModifiableSolrParams());
// add the extra params, don't use 'set' in case of repeating params
@ -165,14 +190,13 @@ public class PutSolrContentStream extends SolrProcessor {
}
}
// send the request to the specified collection, or to the default collection
// specify the collection for SolrCloud
if (isSolrCloud) {
String collection = collectionAttrVal;
if (StringUtils.isBlank(collection)) {
collection = context.getProperty(DEFAULT_COLLECTION).getValue();
}
request.setParam("collection", collection);
collectionUsed.set(collection);
request.setParam(COLLECTION_PARAM_NAME, collection);
}
if (commitWithin != null && commitWithin > 0) {
request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
}
try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
@ -185,11 +209,11 @@ public class PutSolrContentStream extends SolrProcessor {
@Override
public String getContentType() {
return context.getProperty(CONTENT_TYPE).getValue();
return context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
}
});
UpdateResponse response = request.process(getSolrServer());
UpdateResponse response = request.process(getSolrClient());
getLogger().debug("Got {} response from Solr", new Object[]{response.getStatus()});
} catch (SolrException e) {
error.set(e);
@ -213,14 +237,32 @@ public class PutSolrContentStream extends SolrProcessor {
StringBuilder transitUri = new StringBuilder("solr://");
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
if (isSolrCloud) {
transitUri.append(":").append(collectionUsed.get());
transitUri.append(":").append(collection);
}
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri.toString(), duration, true);
getLogger().info("Successfully sent {} to Solr in {} millis", new Object[]{flowFile, duration});
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(flowFile, REL_SUCCESS);
}
}
// get all of the dynamic properties and values into a Map for later adding to the Solr request
private Map<String, String[]> getRequestParams(ProcessContext context, FlowFile flowFile) {
final Map<String,String[]> paramsMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
final String paramName = descriptor.getName();
final String paramValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (!paramValue.trim().isEmpty()) {
MultiMapSolrParams.addParam(paramName, paramValue, paramsMap);
}
}
}
return paramsMap;
}
}

View File

@ -51,7 +51,6 @@ public abstract class SolrProcessor extends AbstractProcessor {
.Builder().name("Solr Type")
.description("The type of Solr instance, Cloud or Standard.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(SOLR_TYPE_CLOUD, SOLR_TYPE_STANDARD)
.defaultValue(SOLR_TYPE_STANDARD.getValue())
.build();
@ -64,58 +63,48 @@ public abstract class SolrProcessor extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DEFAULT_COLLECTION = new PropertyDescriptor
.Builder().name("Default Collection")
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
.Builder().name("Collection")
.description("The Solr collection name, only used with a Solr Type of Cloud")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
private volatile SolrClient solrServer;
private volatile SolrClient solrClient;
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
this.solrServer = createSolrServer(context);
additionalOnScheduled(context);
this.solrClient = createSolrClient(context);
}
/**
* Create a SolrServer based on the type of Solr specified.
* Create a SolrClient based on the type of Solr specified.
*
* @param context
* The context
* @return an HttpSolrServer or CloudSolrServer
* @return an HttpSolrClient or CloudSolrClient
*/
protected SolrClient createSolrServer(final ProcessContext context) {
protected SolrClient createSolrClient(final ProcessContext context) {
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
return new HttpSolrClient(context.getProperty(SOLR_LOCATION).getValue());
} else {
CloudSolrClient cloudSolrServer = new CloudSolrClient(
context.getProperty(SOLR_LOCATION).getValue());
cloudSolrServer.setDefaultCollection(
context.getProperty(DEFAULT_COLLECTION).getValue());
context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
return cloudSolrServer;
}
}
/**
* Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
* {@link #createSolrServer(org.apache.nifi.processor.ProcessContext)} method
* {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} method
*
* @return
*/
protected final SolrClient getSolrServer() {
return solrServer;
}
/**
* Allows additional action to be taken during scheduling of processor.
*
* @param context
* The context
*/
protected void additionalOnScheduled(final ProcessContext context) {
protected final SolrClient getSolrClient() {
return solrClient;
}
@Override
@ -123,10 +112,10 @@ public abstract class SolrProcessor extends AbstractProcessor {
final List<ValidationResult> problems = new ArrayList<>();
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
final String collection = context.getProperty(DEFAULT_COLLECTION).getValue();
final String collection = context.getProperty(COLLECTION).getValue();
if (collection == null || collection.trim().isEmpty()) {
problems.add(new ValidationResult.Builder()
.subject(DEFAULT_COLLECTION.getName())
.subject(COLLECTION.getName())
.input(collection).valid(false)
.explanation("A collection must specified for Solr Type of Cloud")
.build());

View File

@ -183,7 +183,7 @@ public class TestGetSolr {
}
// Override createSolrServer and return the passed in SolrClient
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends GetSolr {
private SolrClient solrClient;
@ -191,7 +191,7 @@ public class TestGetSolr {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
protected SolrClient createSolrClient(ProcessContext context) {
return solrClient;
}
}

View File

@ -82,7 +82,7 @@ public class TestPutSolrContentStream {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs");
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,"json.command=false");
try (FileInputStream fileIn = new FileInputStream(SOLR_JSON_MULTIPLE_DOCS_FILE)) {
@ -91,11 +91,11 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
}
}
@ -104,7 +104,7 @@ public class TestPutSolrContentStream {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/json/docs");
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/json/docs");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
"split=/exams" +
"&f=first:/first" +
@ -120,11 +120,11 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
}
}
@ -133,7 +133,7 @@ public class TestPutSolrContentStream {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update/csv");
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv");
runner.setProperty(PutSolrContentStream.REQUEST_PARAMS,
"fieldnames=first,last,grade,subject,test,marks");
@ -143,11 +143,11 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
}
}
@ -156,7 +156,7 @@ public class TestPutSolrContentStream {
final EmbeddedSolrServerProcessor proc = new EmbeddedSolrServerProcessor(DEFAULT_SOLR_CORE);
final TestRunner runner = createDefaultTestRunner(proc);
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_URL, "/update");
runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update");
runner.setProperty(PutSolrContentStream.CONTENT_TYPE, "application/xml");
try (FileInputStream fileIn = new FileInputStream(XML_MULTIPLE_DOCS_FILE)) {
@ -165,11 +165,11 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertTransferCount(PutSolrContentStream.REL_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_CONNECTION_FAILURE, 0);
runner.assertTransferCount(PutSolrContentStream.REL_ORIGINAL, 1);
runner.assertTransferCount(PutSolrContentStream.REL_SUCCESS, 1);
verifySolrDocuments(proc.getSolrServer(), Arrays.asList(expectedDoc1, expectedDoc2));
verifySolrDocuments(proc.getSolrClient(), Arrays.asList(expectedDoc1, expectedDoc2));
} finally {
try { proc.getSolrServer().shutdown(); } catch (Exception e) { }
try { proc.getSolrClient().shutdown(); } catch (Exception e) { }
}
}
@ -185,7 +185,7 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_CONNECTION_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class));
}
}
@ -201,7 +201,7 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class));
}
}
@ -218,7 +218,7 @@ public class TestPutSolrContentStream {
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_FAILURE, 1);
verify(proc.getSolrServer(), times(1)).request(any(SolrRequest.class));
verify(proc.getSolrClient(), times(1)).request(any(SolrRequest.class));
}
}
@ -230,7 +230,7 @@ public class TestPutSolrContentStream {
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
runner.assertNotValid();
runner.setProperty(PutSolrContentStream.DEFAULT_COLLECTION, "someCollection1");
runner.setProperty(PutSolrContentStream.COLLECTION, "someCollection1");
runner.assertValid();
}
@ -264,7 +264,7 @@ public class TestPutSolrContentStream {
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
protected SolrClient createSolrClient(ProcessContext context) {
mockSolrServer = Mockito.mock(SolrClient.class);
try {
when(mockSolrServer.request(any(SolrRequest.class))).thenThrow(throwable);
@ -279,7 +279,7 @@ public class TestPutSolrContentStream {
}
/**
* Override the createSolrServer method and create and EmbeddedSolrServer.
* Override the createSolrClient method and create and EmbeddedSolrServer.
*/
private class EmbeddedSolrServerProcessor extends PutSolrContentStream {
@ -291,7 +291,7 @@ public class TestPutSolrContentStream {
}
@Override
protected SolrClient createSolrServer(ProcessContext context) {
protected SolrClient createSolrClient(ProcessContext context) {
try {
String relPath = getClass().getProtectionDomain()
.getCodeSource().getLocation().getFile()