mirror of https://github.com/apache/nifi.git
NIFI-7294 Address deprecation issues in solrj and httpclient
Some calls to deprecated methods in httpclient were resulting in UnsupportedOperationException. Use the new API calls in both httpclient and solrj. Add an integration test to include test coverage for org.apache.nifi.processors.solr.SolrUtils.createClient This closes #4171.
This commit is contained in:
parent
992d990dd2
commit
fee1b8b8e0
|
@ -21,8 +21,11 @@ package org.apache.nifi.processors.solr;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.http.client.HttpClient;
|
import org.apache.http.client.HttpClient;
|
||||||
import org.apache.http.conn.scheme.Scheme;
|
import org.apache.http.config.Registry;
|
||||||
import org.apache.http.conn.ssl.SSLSocketFactory;
|
import org.apache.http.config.RegistryBuilder;
|
||||||
|
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||||
|
import org.apache.http.conn.socket.ConnectionSocketFactory;
|
||||||
|
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.context.PropertyContext;
|
import org.apache.nifi.context.PropertyContext;
|
||||||
|
@ -230,7 +233,7 @@ public class SolrUtils {
|
||||||
|
|
||||||
public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$";
|
public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$";
|
||||||
|
|
||||||
public static SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
|
public static synchronized SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
|
||||||
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||||
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||||
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
|
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
|
||||||
|
@ -240,25 +243,35 @@ public class SolrUtils {
|
||||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||||
|
|
||||||
final ModifiableSolrParams params = new ModifiableSolrParams();
|
// Reset HttpClientBuilder static values
|
||||||
params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
|
HttpClientUtil.resetHttpClientBuilder();
|
||||||
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
|
|
||||||
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
|
|
||||||
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
|
|
||||||
|
|
||||||
// has to happen before the client is created below so that correct configurer would be set if needed
|
// has to happen before the client is created below so that correct configurer would be set if needed
|
||||||
if (kerberosCredentialsService != null || (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword))) {
|
if (kerberosCredentialsService != null || (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword))) {
|
||||||
HttpClientUtil.setHttpClientBuilder(new KerberosHttpClientBuilder().getHttpClientBuilder(Optional.empty()));
|
HttpClientUtil.setHttpClientBuilder(new KerberosHttpClientBuilder().getHttpClientBuilder(Optional.empty()));
|
||||||
}
|
}
|
||||||
|
|
||||||
final HttpClient httpClient = HttpClientUtil.createClient(params);
|
|
||||||
|
|
||||||
if (sslContextService != null) {
|
if (sslContextService != null) {
|
||||||
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
|
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
|
||||||
final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
|
final SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext);
|
||||||
final Scheme httpsScheme = new Scheme("https", 443, sslSocketFactory);
|
HttpClientUtil.setSchemaRegistryProvider(new HttpClientUtil.SchemaRegistryProvider() {
|
||||||
httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme);
|
@Override
|
||||||
|
public Registry<ConnectionSocketFactory> getSchemaRegistry() {
|
||||||
|
RegistryBuilder<ConnectionSocketFactory> builder = RegistryBuilder.create();
|
||||||
|
builder.register("http", PlainConnectionSocketFactory.getSocketFactory());
|
||||||
|
builder.register("https", sslSocketFactory);
|
||||||
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
final ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set(HttpClientUtil.PROP_SO_TIMEOUT, socketTimeout);
|
||||||
|
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connectionTimeout);
|
||||||
|
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, maxConnections);
|
||||||
|
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, maxConnectionsPerHost);
|
||||||
|
|
||||||
|
final HttpClient httpClient = HttpClientUtil.createClient(params);
|
||||||
|
|
||||||
if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
|
if (SOLR_TYPE_STANDARD.getValue().equals(context.getProperty(SOLR_TYPE).getValue())) {
|
||||||
return new HttpSolrClient.Builder(solrLocation).withHttpClient(httpClient).build();
|
return new HttpSolrClient.Builder(solrLocation).withHttpClient(httpClient).build();
|
||||||
|
|
|
@ -20,10 +20,13 @@
|
||||||
package org.apache.nifi.processors.solr;
|
package org.apache.nifi.processors.solr;
|
||||||
|
|
||||||
import com.google.gson.stream.JsonReader;
|
import com.google.gson.stream.JsonReader;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||||
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -36,8 +39,10 @@ import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.xmlunit.matchers.CompareMatcher;
|
import org.xmlunit.matchers.CompareMatcher;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
@ -45,16 +50,18 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
public class QuerySolrIT {
|
public class QuerySolrIT {
|
||||||
/*
|
/*
|
||||||
|
@ -117,7 +124,7 @@ public class QuerySolrIT {
|
||||||
CloudSolrClient solrClient = null;
|
CloudSolrClient solrClient = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
solrClient = new CloudSolrClient.Builder().withZkHost(SOLR_LOCATION).build();
|
solrClient = new CloudSolrClient.Builder(Collections.singletonList(SOLR_LOCATION), Optional.empty()).build();
|
||||||
solrClient.setDefaultCollection(SOLR_COLLECTION);
|
solrClient.setDefaultCollection(SOLR_COLLECTION);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -141,7 +148,7 @@ public class QuerySolrIT {
|
||||||
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
|
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
|
||||||
runner.setProperty(SolrUtils.SOLR_LOCATION, "localhost:2181");
|
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_LOCATION);
|
||||||
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
|
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
|
||||||
|
|
||||||
return runner;
|
return runner;
|
||||||
|
@ -627,6 +634,24 @@ public class QuerySolrIT {
|
||||||
assertEquals(controlScore, 45);
|
assertEquals(controlScore, 45);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSslContextService() throws IOException, InitializationException {
|
||||||
|
final QuerySolr proc = Mockito.mock(QuerySolr.class);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
|
||||||
|
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_LOCATION);
|
||||||
|
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
|
||||||
|
|
||||||
|
final SSLContextService sslContextService = new MockSSLContextService();
|
||||||
|
runner.addControllerService("ssl-context", sslContextService);
|
||||||
|
runner.enableControllerService(sslContextService);
|
||||||
|
|
||||||
|
runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
|
||||||
|
proc.onScheduled(runner.getProcessContext());
|
||||||
|
Mockito.verify(proc, Mockito.times(1)).createSolrClient(Mockito.any(ProcessContext.class), Mockito.eq((String)SOLR_LOCATION));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Override createSolrClient and return the passed in SolrClient
|
// Override createSolrClient and return the passed in SolrClient
|
||||||
private class TestableProcessor extends QuerySolr {
|
private class TestableProcessor extends QuerySolr {
|
||||||
private SolrClient solrClient;
|
private SolrClient solrClient;
|
||||||
|
@ -639,4 +664,65 @@ public class QuerySolrIT {
|
||||||
return solrClient;
|
return solrClient;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock implementation so we don't need to have a real keystore/truststore available for testing.
|
||||||
|
*/
|
||||||
|
private class MockSSLContextService extends AbstractControllerService implements SSLContextService {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTrustStoreFile() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTrustStoreType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTrustStorePassword() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isTrustStoreConfigured() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKeyStoreFile() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKeyStoreType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKeyStorePassword() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getKeyPassword() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isKeyStoreConfigured() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSslAlgorithm() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,8 +48,8 @@ import java.util.Date;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
public class TestGetSolr {
|
public class TestGetSolr {
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.client.solrj.SolrQuery;
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.common.SolrDocument;
|
import org.apache.solr.common.SolrDocument;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
@ -318,7 +318,7 @@ public class TestPutSolrContentStream {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
|
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException {
|
||||||
final Throwable throwable = new HttpSolrClient.RemoteSolrException(
|
final Throwable throwable = new BaseHttpSolrClient.RemoteSolrException(
|
||||||
"host", 401, "error", new NumberFormatException());
|
"host", 401, "error", new NumberFormatException());
|
||||||
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.client.solrj.SolrQuery;
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.common.SolrDocument;
|
import org.apache.solr.common.SolrDocument;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
@ -440,7 +440,7 @@ public class TestPutSolrRecord {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException, InitializationException {
|
public void testRemoteSolrExceptionShouldRouteToFailure() throws IOException, SolrServerException, InitializationException {
|
||||||
final Throwable throwable = new HttpSolrClient.RemoteSolrException(
|
final Throwable throwable = new BaseHttpSolrClient.RemoteSolrException(
|
||||||
"host", 401, "error", new NumberFormatException());
|
"host", 401, "error", new NumberFormatException());
|
||||||
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
final ExceptionThrowingProcessor proc = new ExceptionThrowingProcessor(throwable);
|
||||||
|
|
||||||
|
|
|
@ -48,9 +48,9 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
public class TestQuerySolr {
|
public class TestQuerySolr {
|
||||||
static final String DEFAULT_SOLR_CORE = "testCollection";
|
static final String DEFAULT_SOLR_CORE = "testCollection";
|
||||||
|
|
Loading…
Reference in New Issue