mirror of https://github.com/apache/nifi.git
NIFI-3220 Add support for basic auth to Solr processors
This closes #1338. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
5ea17d30c5
commit
6ca25e10ef
|
@ -60,6 +60,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.util.StopWatch;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
|
@ -143,6 +144,8 @@ public class GetSolr extends SolrProcessor {
|
|||
descriptors.add(DATE_FIELD);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
descriptors.add(JAAS_CLIENT_APP_NAME);
|
||||
descriptors.add(BASIC_USERNAME);
|
||||
descriptors.add(BASIC_PASSWORD);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(SOLR_SOCKET_TIMEOUT);
|
||||
descriptors.add(SOLR_CONNECTION_TIMEOUT);
|
||||
|
@ -226,9 +229,14 @@ public class GetSolr extends SolrProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final QueryRequest req = new QueryRequest(solrQuery);
|
||||
if (isBasicAuthEnabled()) {
|
||||
req.setBasicAuthCredentials(getUsername(), getPassword());
|
||||
}
|
||||
|
||||
// run the initial query and send out the first page of results
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
QueryResponse response = getSolrClient().query(solrQuery);
|
||||
QueryResponse response = req.process(getSolrClient());
|
||||
stopWatch.stop();
|
||||
|
||||
long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
|
@ -244,7 +252,7 @@ public class GetSolr extends SolrProcessor {
|
|||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
StringBuilder transitUri = new StringBuilder("solr://");
|
||||
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
|
||||
transitUri.append(getSolrLocation());
|
||||
if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
|
||||
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
|
||||
}
|
||||
|
|
|
@ -18,22 +18,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.solr;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -57,6 +41,22 @@ import org.apache.solr.common.params.ModifiableSolrParams;
|
|||
import org.apache.solr.common.params.MultiMapSolrParams;
|
||||
import org.apache.solr.common.util.ContentStreamBase;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Tags({"Apache", "Solr", "Put", "Send"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr")
|
||||
|
@ -125,6 +125,8 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
descriptors.add(CONTENT_TYPE);
|
||||
descriptors.add(COMMIT_WITHIN);
|
||||
descriptors.add(JAAS_CLIENT_APP_NAME);
|
||||
descriptors.add(BASIC_USERNAME);
|
||||
descriptors.add(BASIC_PASSWORD);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(SOLR_SOCKET_TIMEOUT);
|
||||
descriptors.add(SOLR_CONNECTION_TIMEOUT);
|
||||
|
@ -206,6 +208,11 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
request.setParam(COMMIT_WITHIN_PARAM_NAME, commitWithin.toString());
|
||||
}
|
||||
|
||||
// if a username and password were provided then pass them for basic auth
|
||||
if (isBasicAuthEnabled()) {
|
||||
request.setBasicAuthCredentials(getUsername(), getPassword());
|
||||
}
|
||||
|
||||
try (final BufferedInputStream bufferedIn = new BufferedInputStream(in)) {
|
||||
// add the FlowFile's content on the UpdateRequest
|
||||
request.addContentStream(new ContentStreamBase() {
|
||||
|
@ -248,7 +255,7 @@ public class PutSolrContentStream extends SolrProcessor {
|
|||
session.transfer(flowFile, REL_CONNECTION_FAILURE);
|
||||
} else {
|
||||
StringBuilder transitUri = new StringBuilder("solr://");
|
||||
transitUri.append(context.getProperty(SOLR_LOCATION).getValue());
|
||||
transitUri.append(getSolrLocation());
|
||||
if (isSolrCloud) {
|
||||
transitUri.append(":").append(collection);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.nifi.components.AllowableValue;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
@ -73,6 +74,8 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
"or the ZooKeeper hosts for a Solr Type of Cloud (ex: localhost:9983).")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor COLLECTION = new PropertyDescriptor
|
||||
|
@ -92,6 +95,25 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BASIC_USERNAME = new PropertyDescriptor
|
||||
.Builder().name("Username")
|
||||
.description("The username to use when Solr is configured with basic authentication.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BASIC_PASSWORD = new PropertyDescriptor
|
||||
.Builder().name("Password")
|
||||
.description("The password to use when Solr is configured with basic authentication.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(true)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The Controller Service to use in order to obtain an SSL Context. This property must be set when communicating with a Solr over https.")
|
||||
|
@ -148,10 +170,20 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
.build();
|
||||
|
||||
private volatile SolrClient solrClient;
|
||||
private volatile String solrLocation;
|
||||
private volatile String basicUsername;
|
||||
private volatile String basicPassword;
|
||||
private volatile boolean basicAuthEnabled = false;
|
||||
|
||||
@OnScheduled
|
||||
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||
this.solrClient = createSolrClient(context);
|
||||
this.solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
|
||||
this.basicUsername = context.getProperty(BASIC_USERNAME).evaluateAttributeExpressions().getValue();
|
||||
this.basicPassword = context.getProperty(BASIC_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
if (!StringUtils.isBlank(basicUsername) && !StringUtils.isBlank(basicPassword)) {
|
||||
basicAuthEnabled = true;
|
||||
}
|
||||
this.solrClient = createSolrClient(context, solrLocation);
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
|
@ -172,8 +204,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
* The context
|
||||
* @return an HttpSolrClient or CloudSolrClient
|
||||
*/
|
||||
protected SolrClient createSolrClient(final ProcessContext context) {
|
||||
final String solrLocation = context.getProperty(SOLR_LOCATION).getValue();
|
||||
protected SolrClient createSolrClient(final ProcessContext context, final String solrLocation) {
|
||||
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
|
||||
|
@ -219,7 +250,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
|
||||
/**
|
||||
* Returns the {@link org.apache.solr.client.solrj.SolrClient} that was created by the
|
||||
* {@link #createSolrClient(org.apache.nifi.processor.ProcessContext)} method
|
||||
* {@link #createSolrClient(org.apache.nifi.processor.ProcessContext, String)} method
|
||||
*
|
||||
* @return an HttpSolrClient or CloudSolrClient
|
||||
*/
|
||||
|
@ -227,6 +258,22 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
return solrClient;
|
||||
}
|
||||
|
||||
protected final String getSolrLocation() {
|
||||
return solrLocation;
|
||||
}
|
||||
|
||||
protected final String getUsername() {
|
||||
return basicUsername;
|
||||
}
|
||||
|
||||
protected final String getPassword() {
|
||||
return basicPassword;
|
||||
}
|
||||
|
||||
protected final boolean isBasicAuthEnabled() {
|
||||
return basicAuthEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
|
@ -268,7 +315,7 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
// For solr cloud the location will be the ZooKeeper host:port so we can't validate the SSLContext, but for standard solr
|
||||
// we can validate if the url starts with https we need an SSLContextService, if it starts with http we can't have an SSLContextService
|
||||
if (SOLR_TYPE_STANDARD.equals(context.getProperty(SOLR_TYPE).getValue())) {
|
||||
final String solrLocation = context.getProperty(SOLR_LOCATION).getValue();
|
||||
final String solrLocation = context.getProperty(SOLR_LOCATION).evaluateAttributeExpressions().getValue();
|
||||
if (solrLocation != null) {
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
if (solrLocation.startsWith("https:") && sslContextService == null) {
|
||||
|
@ -287,6 +334,26 @@ public abstract class SolrProcessor extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
// Validate that we username and password are provided together, or that neither are provided
|
||||
final String username = context.getProperty(BASIC_USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(BASIC_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
|
||||
if (!StringUtils.isBlank(username) && StringUtils.isBlank(password)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(BASIC_PASSWORD.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a password must be provided for the given username")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (!StringUtils.isBlank(password) && StringUtils.isBlank(username)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(BASIC_USERNAME.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a username must be provided for the given password")
|
||||
.build());
|
||||
}
|
||||
|
||||
Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context);
|
||||
if (otherProblems != null) {
|
||||
problems.addAll(otherProblems);
|
||||
|
|
|
@ -214,7 +214,7 @@ public class TestGetSolr {
|
|||
this.solrClient = solrClient;
|
||||
}
|
||||
@Override
|
||||
protected SolrClient createSolrClient(ProcessContext context) {
|
||||
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
|
||||
return solrClient;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -392,6 +392,35 @@ public class TestPutSolrContentStream {
|
|||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUsernamePasswordValidation() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
||||
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue());
|
||||
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "http://localhost:8443/solr");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "user1");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "password");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(PutSolrContentStream.BASIC_USERNAME, "${solr.user}");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setVariable("solr.user", "solrRocks");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setProperty(PutSolrContentStream.BASIC_PASSWORD, "${solr.password}");
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setVariable("solr.password", "solrRocksPassword");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJAASClientAppNameValidation() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(PutSolrContentStream.class);
|
||||
|
@ -492,7 +521,7 @@ public class TestPutSolrContentStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected SolrClient createSolrClient(ProcessContext context) {
|
||||
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
|
||||
mockSolrClient = new SolrClient() {
|
||||
@Override
|
||||
public NamedList<Object> request(SolrRequest solrRequest, String s) throws SolrServerException, IOException {
|
||||
|
@ -522,7 +551,7 @@ public class TestPutSolrContentStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected SolrClient createSolrClient(ProcessContext context) {
|
||||
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
|
||||
mockSolrClient = Mockito.mock(SolrClient.class);
|
||||
try {
|
||||
when(mockSolrClient.request(any(SolrRequest.class),
|
||||
|
@ -545,7 +574,7 @@ public class TestPutSolrContentStream {
|
|||
this.solrClient = solrClient;
|
||||
}
|
||||
@Override
|
||||
protected SolrClient createSolrClient(ProcessContext context) {
|
||||
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
|
||||
return solrClient;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue