NIFI-11518 Upgraded Jetty from 9.4.51 to 10.0.15

- Updated Jetty WebSocket components using Jetty 10 components
- Upgraded Solr components from 8.11.2 to 9.2.1 to align with Jetty 10 dependencies

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7622
This commit is contained in:
exceptionfactory 2023-05-02 21:06:47 -05:00 committed by Matt Burgess
parent c0c1b386f6
commit c08560447f
31 changed files with 218 additions and 951 deletions

View File

@ -42,14 +42,6 @@
Default web.xml file.
This file is applied to a Web application before it's own WEB_INF/web.xml file
</description>
<!-- ==================================================================== -->
<!-- Removes static references to beans from javax.el.BeanELResolver to -->
<!-- ensure webapp classloader can be released on undeploy -->
<!-- ==================================================================== -->
<listener>
<listener-class>org.eclipse.jetty.servlet.listener.ELContextCleaner</listener-class>
</listener>
<!-- ==================================================================== -->
<!-- Removes static configurationCache of Methods from java.beans.Introspector to -->

View File

@ -34,9 +34,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
@ -155,11 +152,6 @@ public class JettyServer {
// instruction jetty to examine these jars for tlds, web-fragments, etc
webappContext.setAttribute("org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern", ".*/[^/]*servlet-api-[^/]*\\.jar$|.*/javax.servlet.jsp.jstl-.*\\\\.jar$|.*/[^/]*taglibs.*\\.jar$" );
// remove slf4j server class to allow WAR files to have slf4j dependencies in WEB-INF/lib
List<String> serverClasses = new ArrayList<>(Arrays.asList(webappContext.getServerClasses()));
serverClasses.remove("org.slf4j.");
webappContext.setServerClasses(serverClasses.toArray(new String[0]));
webappContext.setDefaultsDescriptor(WEB_DEFAULTS_XML);
// get the temp directory for this webapp

View File

@ -64,7 +64,6 @@ import org.apache.nifi.web.server.filter.RestApiRequestFilterProvider;
import org.apache.nifi.web.server.filter.StandardRequestFilterProvider;
import org.apache.nifi.web.server.log.RequestLogProvider;
import org.apache.nifi.web.server.log.StandardRequestLogProvider;
import org.eclipse.jetty.annotations.AnnotationConfiguration;
import org.eclipse.jetty.deploy.App;
import org.eclipse.jetty.deploy.DeploymentManager;
import org.eclipse.jetty.server.Connector;
@ -81,8 +80,6 @@ import org.eclipse.jetty.servlet.ErrorPageErrorHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.webapp.Configuration;
import org.eclipse.jetty.webapp.JettyWebXmlConfiguration;
import org.eclipse.jetty.webapp.WebAppClassLoader;
import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
@ -107,7 +104,6 @@ import java.net.NetworkInterface;
import java.net.SocketException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -197,10 +193,6 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
// create the server
this.server = new Server(threadPool);
// enable the annotation based configuration to ensure the jsp container is initialized properly
final Configuration.ClassList classlist = Configuration.ClassList.setServerDefault(server);
classlist.addBefore(JettyWebXmlConfiguration.class.getName(), AnnotationConfiguration.class.getName());
// configure server
configureConnectors(server);
@ -593,10 +585,6 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
// instruction jetty to examine these jars for tlds, web-fragments, etc
webappContext.setAttribute(CONTAINER_INCLUDE_PATTERN_KEY, CONTAINER_INCLUDE_PATTERN_VALUE);
// remove slf4j server class to allow WAR files to have slf4j dependencies in WEB-INF/lib
List<String> serverClasses = new ArrayList<>(Arrays.asList(webappContext.getServerClasses()));
serverClasses.remove("org.slf4j.");
webappContext.setServerClasses(serverClasses.toArray(new String[0]));
webappContext.setDefaultsDescriptor(WEB_DEFAULTS_XML);
webappContext.getMimeTypes().addMimeMapping("ttf", "font/ttf");
webappContext.setErrorHandler(getErrorHandler());

View File

@ -18,14 +18,15 @@ package org.apache.nifi.web.server.util;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Scanner;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
@ -38,7 +39,7 @@ import static org.apache.nifi.security.util.SslContextFactory.createSslContext;
* File Scanner for Keystore or Truststore reloading using provided TLS Configuration
*/
public class StoreScanner extends ContainerLifeCycle implements Scanner.DiscreteListener {
private static final Logger LOG = Log.getLogger(StoreScanner.class);
private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
private final SslContextFactory sslContextFactory;
private final TlsConfiguration tlsConfiguration;
@ -114,8 +115,8 @@ public class StoreScanner extends ContainerLifeCycle implements Scanner.Discrete
public void scan() {
LOG.debug("Resource [{}] scanning started", resourceName);
this.scanner.scan();
this.scanner.scan();
this.scanner.scan(new Callback.Completable());
this.scanner.scan(new Callback.Completable());
}
@ManagedOperation(

View File

@ -42,14 +42,6 @@
Default web.xml file.
This file is applied to a Web application before it's own WEB_INF/web.xml file
</description>
<!-- ==================================================================== -->
<!-- Removes static references to beans from javax.el.BeanELResolver to -->
<!-- ensure webapp classloader can be released on undeploy -->
<!-- ==================================================================== -->
<listener>
<listener-class>org.eclipse.jetty.servlet.listener.ELContextCleaner</listener-class>
</listener>
<!-- ==================================================================== -->
<!-- Removes static cache of Methods from java.beans.Introspector to -->

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -77,7 +78,7 @@ public class NiFiTestServer {
// TODO: Refactor this method to use proper factory methods
private void createSecureConnector() {
org.eclipse.jetty.util.ssl.SslContextFactory contextFactory = new org.eclipse.jetty.util.ssl.SslContextFactory.Server();
SslContextFactory.Server contextFactory = new SslContextFactory.Server();
// require client auth when not supporting login or anonymous access
if (StringUtils.isBlank(properties.getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER))) {

View File

@ -28,6 +28,12 @@
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<!-- Add Jetty 10 websocket for Hadoop YarnClientImpl.shellToContainer -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -49,6 +55,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -59,6 +59,12 @@
</exclusion>
</exclusions>
</dependency>
<!-- Add Jetty 10 websocket for Hadoop YarnClientImpl.shellToContainer -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -80,6 +86,11 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Override Guava 27 -->

View File

@ -83,6 +83,12 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<!-- Add Jetty 10 websocket for Hadoop YarnClientImpl.shellToContainer -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
@ -133,6 +139,11 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -285,6 +296,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -92,6 +92,12 @@
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Add Jetty 10 websocket for Hadoop YarnClientImpl.shellToContainer -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -117,6 +123,11 @@
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -68,6 +68,11 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -52,12 +52,6 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>

View File

@ -103,7 +103,7 @@ public class PrometheusServer {
this.handler = new ServletContextHandler(server, "/metrics");
this.handler.addServlet(new ServletHolder(new MetricsServlet()), "/");
SslContextFactory sslFactory = createSslFactory(sslContextService, needClientAuth, wantClientAuth);
SslContextFactory.Server sslFactory = createSslFactory(sslContextService, needClientAuth, wantClientAuth);
HttpConfiguration httpsConfiguration = new HttpConfiguration();
httpsConfiguration.setSecureScheme("https");
httpsConfiguration.setSecurePort(addr);
@ -123,7 +123,7 @@ public class PrometheusServer {
}
}
private SslContextFactory createSslFactory(final SSLContextService sslService, boolean needClientAuth, boolean wantClientAuth) {
private SslContextFactory.Server createSslFactory(final SSLContextService sslService, boolean needClientAuth, boolean wantClientAuth) {
final SslContextFactory.Server sslFactory = new SslContextFactory.Server();
sslFactory.setNeedClientAuth(needClientAuth);

View File

@ -121,6 +121,12 @@
<version>1.3.9-1</version>
</dependency>
<!-- hadoop-client is needed for auditing to HDFS -->
<!-- Add Jetty 10 websocket for Hadoop YarnClientImpl.shellToContainer -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -138,6 +144,11 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<!-- Exclude Jetty 9.4 -->
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- hadoop-common and hadoop-auth are transitive dependencies of ranger client, but we need to make sure they

View File

@ -23,7 +23,7 @@
<artifactId>nifi-solr-processors</artifactId>
<packaging>jar</packaging>
<properties>
<solr.version>8.11.2</solr.version>
<solr.version>9.2.1</solr.version>
</properties>
<dependencies>
<dependency>
@ -39,22 +39,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<!-- Override woodstox-core 6.2.4 from Solr 8.11.1 -->
<dependency>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
<version>6.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solr.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@ -64,6 +52,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
@ -98,6 +90,17 @@
<artifactId>gson</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
@ -116,45 +119,6 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<!-- Need to declare the newer versions of these b/c NiFi uses Lucene 4.10.3 -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>${solr.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xmlunit</groupId>
<artifactId>xmlunit-matchers</artifactId>

View File

@ -16,9 +16,7 @@
*/
package org.apache.nifi.processors.solr;
import java.lang.invoke.MethodHandles;
import java.security.Principal;
import java.util.Optional;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
@ -38,8 +36,6 @@ import org.apache.solr.client.solrj.impl.HttpClientBuilderFactory;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.impl.SolrPortAwareCookieSpecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is a modified version of Krb5HttpClientBuilder that is part of SolrJ.
@ -49,23 +45,17 @@ import org.slf4j.LoggerFactory;
*/
public class KerberosHttpClientBuilder implements HttpClientBuilderFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public KerberosHttpClientBuilder() {
}
public SolrHttpClientBuilder getBuilder() {
return getBuilder(HttpClientUtil.getHttpClientBuilder());
}
public void close() {
HttpClientUtil.removeRequestInterceptor(bufferedEntityInterceptor);
}
@Override
public SolrHttpClientBuilder getHttpClientBuilder(Optional<SolrHttpClientBuilder> builder) {
return builder.isPresent() ? getBuilder(builder.get()) : getBuilder();
public SolrHttpClientBuilder getHttpClientBuilder(SolrHttpClientBuilder builder) {
return getBuilder(builder);
}
public SolrHttpClientBuilder getBuilder(SolrHttpClientBuilder builder) {

View File

@ -50,9 +50,10 @@ import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
@ -238,9 +239,10 @@ public class SolrUtils {
public static final String REPEATING_PARAM_PATTERN = "[\\w\\.]+\\.\\d+$";
private static final String ROOT_PATH = "/";
@SuppressWarnings("deprecation")
public static synchronized SolrClient createSolrClient(final PropertyContext context, final String solrLocation) {
final Integer socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int socketTimeout = context.getProperty(SOLR_SOCKET_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int connectionTimeout = context.getProperty(SOLR_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer maxConnections = context.getProperty(SOLR_MAX_CONNECTIONS).asInteger();
final Integer maxConnectionsPerHost = context.getProperty(SOLR_MAX_CONNECTIONS_PER_HOST).asInteger();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@ -253,7 +255,7 @@ public class SolrUtils {
// has to happen before the client is created below so that correct configurer would be set if needed
if (kerberosCredentialsService != null || (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword))) {
HttpClientUtil.setHttpClientBuilder(new KerberosHttpClientBuilder().getHttpClientBuilder(Optional.empty()));
HttpClientUtil.setHttpClientBuilder(new KerberosHttpClientBuilder().getHttpClientBuilder(SolrHttpClientBuilder.create()));
}
if (sslContextService != null) {
@ -287,13 +289,15 @@ public class SolrUtils {
String zkChrootPath = getZooKeeperChrootPathSuffix(solrLocation);
final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue();
final Integer zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int zkClientTimeout = context.getProperty(ZK_CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int zkConnectionTimeout = context.getProperty(ZK_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(zkList, Optional.of(zkChrootPath)).withHttpClient(httpClient).build();
final CloudLegacySolrClient cloudSolrClient = new CloudLegacySolrClient.Builder(zkList, Optional.of(zkChrootPath))
.withConnectionTimeout(zkConnectionTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(zkClientTimeout, TimeUnit.MILLISECONDS)
.withHttpClient(httpClient)
.build();
cloudSolrClient.setDefaultCollection(collection);
cloudSolrClient.setZkClientTimeout(zkClientTimeout);
cloudSolrClient.setZkConnectTimeout(zkConnectionTimeout);
return cloudSolrClient;
}
}

View File

@ -23,9 +23,11 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.NodeConfig;
import org.apache.solr.logging.LogWatcherConfig;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
/**
@ -60,7 +62,8 @@ public class EmbeddedSolrServerFactory {
public static SolrClient create(String solrHome, String coreName, String dataDir)
throws IOException {
NodeConfig.NodeConfigBuilder nodeConfig = new NodeConfig.NodeConfigBuilder(coreName, Paths.get(solrHome));
final Path homePath = Paths.get(solrHome).toAbsolutePath();
NodeConfig.NodeConfigBuilder nodeConfig = new NodeConfig.NodeConfigBuilder(coreName, homePath);
if (dataDir != null) {
File coreDataDir = new File(dataDir + "/" + coreName);
@ -70,7 +73,10 @@ public class EmbeddedSolrServerFactory {
nodeConfig.setSolrDataHome(coreDataDir.getPath());
}
final CoreContainer coreContainer = new CoreContainer(new NodeConfig.NodeConfigBuilder(coreName, Paths.get(solrHome)).build());
final LogWatcherConfig logWatcherConfig = new LogWatcherConfig(false, EmbeddedSolrServerFactory.class.getSimpleName(), "ERROR", 0);
final CoreContainer coreContainer = new CoreContainer(new NodeConfig.NodeConfigBuilder(coreName, homePath)
.setLogWatcherConfig(logWatcherConfig)
.build());
coreContainer.load();
return new EmbeddedSolrServer(coreContainer, coreName);

View File

@ -1,664 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.solr;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import com.google.gson.stream.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.SolrInputDocument;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.xmlunit.matchers.CompareMatcher;
public class QuerySolrIT {
/*
This integration test expects a Solr instance running locally in SolrCloud mode, coordinated by a single ZooKeeper
instance accessible with the ZooKeeper-Connect-String "localhost:2181".
*/
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US);
private static final SimpleDateFormat DATE_FORMAT_SOLR_COLLECTION = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss", Locale.US);
private static String SOLR_COLLECTION;
private static String ZK_CONFIG_PATH;
private static String ZK_CONFIG_NAME;
private static String SOLR_LOCATION = "localhost:2181";
static {
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
Date date = new Date();
SOLR_COLLECTION = DATE_FORMAT_SOLR_COLLECTION.format(date) + "_QuerySolrIT";
ZK_CONFIG_PATH = "src/test/resources/solr/testCollection/conf";
ZK_CONFIG_NAME = "QuerySolrIT_config";
}
@BeforeAll
public static void setup() throws IOException, SolrServerException {
CloudSolrClient solrClient = createSolrClient();
Path currentDir = Paths.get(ZK_CONFIG_PATH);
ZkClientClusterStateProvider stateProvider = new ZkClientClusterStateProvider(SOLR_LOCATION);
stateProvider.uploadConfig(currentDir, ZK_CONFIG_NAME);
solrClient.setDefaultCollection(SOLR_COLLECTION);
if (!solrClient.getZkStateReader().getClusterState().hasCollection(SOLR_COLLECTION)) {
CollectionAdminRequest.Create createCollection = CollectionAdminRequest.createCollection(SOLR_COLLECTION, ZK_CONFIG_NAME, 1, 1);
createCollection.process(solrClient);
} else {
solrClient.deleteByQuery("*:*");
}
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "doc" + i);
Date date = new Date();
doc.addField("created", DATE_FORMAT.format(date));
doc.addField("string_single", "single" + i + ".1");
doc.addField("string_multi", "multi" + i + ".1");
doc.addField("string_multi", "multi" + i + ".2");
doc.addField("integer_single", i);
doc.addField("integer_multi", 1);
doc.addField("integer_multi", 2);
doc.addField("integer_multi", 3);
doc.addField("double_single", 0.5 + i);
solrClient.add(doc);
}
solrClient.commit();
}
public static CloudSolrClient createSolrClient() {
CloudSolrClient solrClient = null;
try {
solrClient = new CloudSolrClient.Builder(Collections.singletonList(SOLR_LOCATION), Optional.empty()).build();
solrClient.setDefaultCollection(SOLR_COLLECTION);
} catch (Exception e) {
e.printStackTrace();
}
return solrClient;
}
@AfterAll
public static void teardown() {
try {
CloudSolrClient solrClient = createSolrClient();
CollectionAdminRequest.Delete deleteCollection = CollectionAdminRequest.deleteCollection(SOLR_COLLECTION);
deleteCollection.process(solrClient);
solrClient.close();
} catch (Exception e) {
}
}
private TestRunner createRunnerWithSolrClient(SolrClient solrClient) {
final TestableProcessor proc = new TestableProcessor(solrClient);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_LOCATION);
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
return runner;
}
@Test
public void testAllFacetCategories() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("facet.field", "integer_multi");
runner.setProperty("facet.interval", "integer_single");
runner.setProperty("facet.interval.set.1", "[4,7]");
runner.setProperty("facet.interval.set.2", "[5,7]");
runner.setProperty("facet.range", "created");
runner.setProperty("facet.range.start", "NOW/MINUTE");
runner.setProperty("facet.range.end", "NOW/MINUTE+1MINUTE");
runner.setProperty("facet.range.gap", "+20SECOND");
runner.setProperty("facet.query.1", "*:*");
runner.setProperty("facet.query.2", "integer_multi:2");
runner.setProperty("facet.query.3", "integer_multi:3");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.FACETS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
if (name.equals("facet_queries")) {
assertEquals(30, returnCheckSumForArrayOfJsonObjects(reader));
} else if (name.equals("facet_fields")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_multi");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 30);
reader.endObject();
} else if (name.equals("facet_ranges")) {
reader.beginObject();
assertEquals(reader.nextName(), "created");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 10);
reader.endObject();
} else if (name.equals("facet_intervals")) {
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
assertEquals(returnCheckSumForArrayOfJsonObjects(reader), 7);
reader.endObject();
}
}
reader.endObject();
reader.close();
solrClient.close();
}
private int returnCheckSumForArrayOfJsonObjects(JsonReader reader) throws IOException {
int checkSum = 0;
reader.beginArray();
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("count")) {
checkSum += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.endArray();
return checkSum;
}
@Test
public void testFacetTrueButNull() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
// Check for empty nestet Objects in JSON
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0)))));
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("facet_queries")) {
reader.beginArray();
assertFalse(reader.hasNext());
reader.endArray();
} else {
reader.beginObject();
assertFalse(reader.hasNext());
reader.endObject();
}
}
reader.endObject();
JsonReader reader_stats = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader_stats.beginObject();
assertEquals(reader_stats.nextName(), "stats_fields");
reader_stats.beginObject();
assertFalse(reader_stats.hasNext());
reader_stats.endObject();
reader_stats.endObject();
reader.close();
reader_stats.close();
solrClient.close();
}
@Test
public void testStats() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("stats", "true");
runner.setProperty("stats.field", "integer_single");
runner.enqueue(new ByteArrayInputStream(new byte[0]));
runner.run();
runner.assertTransferCount(QuerySolr.STATS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0)))));
reader.beginObject();
assertEquals(reader.nextName(), "stats_fields");
reader.beginObject();
assertEquals(reader.nextName(), "integer_single");
reader.beginObject();
while (reader.hasNext()) {
String name = reader.nextName();
switch (name) {
case "min": assertEquals(reader.nextString(), "0.0"); break;
case "max": assertEquals(reader.nextString(), "9.0"); break;
case "count": assertEquals(reader.nextInt(), 10); break;
case "sum": assertEquals(reader.nextString(), "45.0"); break;
default: reader.skipValue(); break;
}
}
reader.endObject();
reader.endObject();
reader.endObject();
reader.close();
solrClient.close();
}
@Test
public void testRelationshipRoutings() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
// Set request handler for request failure
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/nonexistentrequesthandler");
// Processor has no input connection and fails
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Processor has an input connection and fails
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(QuerySolr.FAILURE, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FAILURE).get(0);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION);
flowFile.assertAttributeExists(QuerySolr.EXCEPTION_MESSAGE);
runner.clearTransferState();
// Set request handler for successful request
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "/select");
// Processor has no input connection and succeeds
runner.setNonLoopConnection(false);
runner.run(1, false);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
// Processor has an input connection and succeeds
runner.setNonLoopConnection(true);
runner.enqueue(new byte[0]);
runner.run(1, true);
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertAllFlowFilesContainAttribute(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.FACETS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
flowFile = runner.getFlowFilesForRelationship(QuerySolr.STATS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_CONNECT);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
runner.clearTransferState();
solrClient.close();
}
@Test
public void testExpressionLanguageForProperties() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "${query}");
runner.setProperty(QuerySolr.SOLR_PARAM_REQUEST_HANDLER, "${handler}");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "${fields}");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "${sort}");
runner.setProperty(QuerySolr.SOLR_PARAM_START, "${start}");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "${rows}");
runner.enqueue(new byte[0], new HashMap<String,String>(){{
put("query", "id:(doc0 OR doc1 OR doc2 OR doc3)");
put("handler", "/select");
put("fields", "id");
put("sort", "id desc");
put("start", "1");
put("rows", "2");
}});
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testSingleFilterQuery() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq", "id:(doc2 OR doc3)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testMultipleFilterQueries() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty("fq.1", "id:(doc0 OR doc1 OR doc2 OR doc3)");
runner.setProperty("fq.2", "id:(doc1 OR doc2 OR doc3 OR doc4)");
runner.setProperty("fq.3", "id:(doc2 OR doc3 OR doc4 OR doc5)");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc2</field></doc><doc boost=\"1.0\"><field name=\"id\">doc3</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
solrClient.close();
}
@Test
public void testStandardResponse() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:(doc0 OR doc1)");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id desc");
runner.setNonLoopConnection(false);
runner.run();
runner.assertAllFlowFilesTransferred(QuerySolr.RESULTS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_CURSOR_MARK);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_SOLR_STATUS);
flowFile.assertAttributeExists(QuerySolr.ATTRIBUTE_QUERY_TIME);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc1</field></doc><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
solrClient.close();
}
@Test
public void testPreserveOriginalContent() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_QUERY, "id:doc0");
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
String content = "test content 123";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
String expectedXml = "<docs><doc boost=\"1.0\"><field name=\"id\">doc0</field></doc></docs>";
assertThat(expectedXml, CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
assertEquals(content, new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.ORIGINAL).get(0))));
solrClient.close();
}
@Test
public void testRetrievalOfFullResults() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "2");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 5);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.STATS, 0);
runner.assertTransferCount(QuerySolr.FACETS, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(QuerySolr.RESULTS);
Integer documentCounter = 0;
Integer startParam = 0;
for (MockFlowFile flowFile : flowFiles) {
Map<String,String> attributes = flowFile.getAttributes();
assertEquals(attributes.get(QuerySolr.ATTRIBUTE_SOLR_START), startParam.toString());
startParam += 2;
StringBuffer expectedXml = new StringBuffer()
.append("<docs><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc><doc boost=\"1.0\"><field name=\"id\">doc")
.append(documentCounter++)
.append("</field></doc></docs>");
assertThat(expectedXml.toString(), CompareMatcher.isIdenticalTo(new String(runner.getContentAsByteArray(flowFile))));
}
solrClient.close();
}
@Test
public void testRetrievalOfFullResults2() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 1);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRetrievalOfFullResults3() throws IOException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id");
runner.setProperty(QuerySolr.SOLR_PARAM_SORT, "id asc");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "3");
runner.setProperty(QuerySolr.AMOUNT_DOCUMENTS_TO_RETURN, QuerySolr.RETURN_ALL_RESULTS);
runner.setProperty("facet", "true");
runner.setProperty("stats", "true");
runner.setNonLoopConnection(false);
runner.run();
runner.assertTransferCount(QuerySolr.RESULTS, 4);
runner.assertTransferCount(QuerySolr.ORIGINAL, 0);
runner.assertTransferCount(QuerySolr.FACETS, 1);
runner.assertTransferCount(QuerySolr.STATS, 1);
solrClient.close();
}
@Test
public void testRecordResponse() throws IOException, InitializationException {
SolrClient solrClient = createSolrClient();
TestRunner runner = createRunnerWithSolrClient(solrClient);
runner.setProperty(QuerySolr.RETURN_TYPE, QuerySolr.MODE_REC.getValue());
runner.setProperty(QuerySolr.SOLR_PARAM_FIELD_LIST, "id,created,integer_single");
runner.setProperty(QuerySolr.SOLR_PARAM_ROWS, "10");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc")));
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.setProperty(SolrUtils.RECORD_WRITER, "writer");
runner.setNonLoopConnection(false);
runner.run(1);
runner.assertQueueEmpty();
runner.assertTransferCount(QuerySolr.RESULTS, 1);
JsonReader reader = new JsonReader(new InputStreamReader(new ByteArrayInputStream(
runner.getContentAsByteArray(runner.getFlowFilesForRelationship(QuerySolr.RESULTS).get(0)))));
reader.beginArray();
int controlScore = 0;
while (reader.hasNext()) {
reader.beginObject();
while (reader.hasNext()) {
if (reader.nextName().equals("integer_single")) {
controlScore += reader.nextInt();
} else {
reader.skipValue();
}
}
reader.endObject();
}
reader.close();
solrClient.close();
assertEquals(controlScore, 45);
}
@Test
public void testSslContextService() throws IOException, InitializationException {
final QuerySolr proc = Mockito.mock(QuerySolr.class);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(SolrUtils.SOLR_LOCATION, SOLR_LOCATION);
runner.setProperty(SolrUtils.COLLECTION, SOLR_COLLECTION);
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn("ssl-context");
runner.addControllerService("ssl-context", sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(SolrUtils.SSL_CONTEXT_SERVICE, "ssl-context");
proc.onScheduled(runner.getProcessContext());
Mockito.verify(proc, Mockito.times(1)).createSolrClient(Mockito.any(ProcessContext.class), Mockito.eq(SOLR_LOCATION));
}
// Override createSolrClient and return the passed in SolrClient
private class TestableProcessor extends QuerySolr {
private SolrClient solrClient;
public TestableProcessor(SolrClient solrClient) {
this.solrClient = solrClient;
}
@Override
protected SolrClient createSolrClient(ProcessContext context, String solrLocation) {
return solrClient;
}
}
}

View File

@ -598,7 +598,7 @@ public class HandleHttpRequest extends AbstractProcessor {
final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
List<Part> parts = null;
try {
parts = Collections.unmodifiableList(new ArrayList<>(request.getParts()));

View File

@ -288,7 +288,7 @@ public class ListenHTTPServlet extends HttpServlet {
throws IOException, IllegalStateException, ServletException {
Set<FlowFile> flowFileSet = new HashSet<>();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
int i = 0;
final Collection<Part> requestParts = request.getParts();
for (final Part part : requestParts) {

View File

@ -32,9 +32,19 @@
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-jetty-configuration</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<artifactId>websocket-jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>

View File

@ -17,13 +17,8 @@
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.AbstractWebSocketService;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import java.util.ArrayList;
import java.util.List;
@ -64,42 +59,4 @@ public abstract class AbstractJettyWebSocketService extends AbstractWebSocketSer
descriptors.add(MAX_BINARY_MESSAGE_SIZE);
return descriptors;
}
protected SslContextFactory createSslFactory(final SSLContextService sslService, final boolean needClientAuth, final boolean wantClientAuth, final String endpointIdentificationAlgorithm) {
final SslContextFactory sslFactory = new SslContextFactory.Server();
sslFactory.setNeedClientAuth(needClientAuth);
sslFactory.setWantClientAuth(wantClientAuth);
// Need to set SslContextFactory's endpointIdentificationAlgorithm.
// For clients, hostname verification should be enabled.
// For servers, hostname verification should be disabled.
// Previous to Jetty 9.4.15.v20190215, this defaulted to null, and now defaults to "HTTPS".
sslFactory.setEndpointIdentificationAlgorithm(endpointIdentificationAlgorithm);
if (sslService.isKeyStoreConfigured()) {
sslFactory.setKeyStorePath(sslService.getKeyStoreFile());
sslFactory.setKeyStorePassword(sslService.getKeyStorePassword());
sslFactory.setKeyStoreType(sslService.getKeyStoreType());
}
if (sslService.isTrustStoreConfigured()) {
sslFactory.setTrustStorePath(sslService.getTrustStoreFile());
sslFactory.setTrustStorePassword(sslService.getTrustStorePassword());
sslFactory.setTrustStoreType(sslService.getTrustStoreType());
}
return sslFactory;
}
protected void configurePolicy(final ConfigurationContext context, final WebSocketPolicy policy) {
final int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
policy.setInputBufferSize(inputBufferSize);
policy.setMaxTextMessageSize(maxTextMessageSize);
policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@ -37,13 +38,17 @@ import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -223,25 +228,31 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
@Override
public void startClient(final ConfigurationContext context) throws Exception {
configurationContext = context;
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
SslContextFactory sslContextFactory = null;
if (sslService != null) {
sslContextFactory = createSslFactory(sslService, false, false, null);
}
HttpClient httpClient = new HttpClient(sslContextFactory);
final HttpClient httpClient;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
if (sslContextService == null) {
httpClient = new HttpClient();
} else {
final SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
final SSLContext sslContext = sslContextService.createContext();
sslContextFactory.setSslContext(sslContext);
final ClientConnector clientConnector = new ClientConnector();
clientConnector.setSslContextFactory(sslContextFactory);
httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector));
}
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
if (proxyHost != null && proxyPort != null) {
HttpProxy httpProxy = new HttpProxy(proxyHost, proxyPort);
httpClient.getProxyConfiguration().getProxies().add(httpProxy);
httpClient.getProxyConfiguration().addProxy(httpProxy);
}
client = new WebSocketClient(httpClient);
configurePolicy(context, client.getPolicy());
configurePolicy(context, client);
final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
final String userPassword = context.getProperty(USER_PASSWORD).evaluateAttributeExpressions().getValue();
final String customAuth = context.getProperty(CUSTOM_AUTH).evaluateAttributeExpressions().getValue();
@ -433,4 +444,13 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
public String getTargetUri() {
return webSocketUri.toString();
}
private void configurePolicy(final ConfigurationContext context, final WebSocketPolicy policy) {
final int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
policy.setInputBufferSize(inputBufferSize);
policy.setMaxTextMessageSize(maxTextMessageSize);
policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
}
}

View File

@ -29,6 +29,8 @@ import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
@ -41,25 +43,22 @@ import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.security.LoginService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.server.JettyServerUpgradeRequest;
import org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse;
import org.eclipse.jetty.websocket.server.JettyWebSocketCreator;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -166,8 +165,7 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.addAll(getAbstractPropertyDescriptors());
final List<PropertyDescriptor> props = new ArrayList<>(getAbstractPropertyDescriptors());
props.add(LISTEN_PORT);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
@ -177,14 +175,11 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
props.add(LOGIN_SERVICE);
props.add(USERS_PROPERTIES_FILE);
properties = Collections.unmodifiableList(props);
}
private WebSocketPolicy configuredPolicy;
private Server server;
private Integer listenPort;
private ServletHandler servletHandler;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -198,7 +193,7 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
final List<ValidationResult> results = new ArrayList<>();
if (validationContext.getProperty(BASIC_AUTH).asBoolean()) {
final String loginServiceValue = validationContext.getProperty(LOGIN_SERVICE).getValue();
if (LOGIN_SERVICE_HASH.equals(loginServiceValue)) {
if (LOGIN_SERVICE_HASH.getValue().equals(loginServiceValue)) {
if (!validationContext.getProperty(USERS_PROPERTIES_FILE).isSet()) {
results.add(new ValidationResult.Builder().subject(USERS_PROPERTIES_FILE.getDisplayName())
.explanation("it is required by HashLoginService").valid(false).build());
@ -209,16 +204,28 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
return results;
}
public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator {
public static class StandardJettyWebSocketServlet extends JettyWebSocketServlet implements JettyWebSocketCreator {
private final ConfigurationContext context;
public StandardJettyWebSocketServlet(final ConfigurationContext context) {
this.context = context;
}
@Override
public void configure(WebSocketServletFactory webSocketServletFactory) {
public void configure(final JettyWebSocketServletFactory webSocketServletFactory) {
final int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
webSocketServletFactory.setInputBufferSize(inputBufferSize);
webSocketServletFactory.setMaxTextMessageSize(maxTextMessageSize);
webSocketServletFactory.setMaxBinaryMessageSize(maxBinaryMessageSize);
webSocketServletFactory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
public Object createWebSocket(JettyServerUpgradeRequest servletUpgradeRequest, JettyServerUpgradeResponse servletUpgradeResponse) {
final URI requestURI = servletUpgradeRequest.getRequestURI();
final int port = servletUpgradeRequest.getLocalPort();
final int port = ((InetSocketAddress) servletUpgradeRequest.getLocalSocketAddress()).getPort();
final JettyWebSocketServer service = portToControllerService.get(port);
if (service == null) {
@ -233,33 +240,18 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
throw new IllegalStateException("Failed to get router due to: " + e, e);
}
final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) {
@Override
public void onWebSocketConnect(Session session) {
final WebSocketPolicy currentPolicy = session.getPolicy();
currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
super.onWebSocketConnect(session);
}
};
return listener;
return new RoutingWebSocketListener(router);
}
}
@OnEnabled
@Override
public void startServer(final ConfigurationContext context) throws Exception {
if (server != null && server.isRunning()) {
getLogger().info("A WebSocket server is already running. {}", new Object[]{server});
getLogger().info("Jetty WebSocket Server running {}", server);
return;
}
configuredPolicy = WebSocketPolicy.newServerPolicy();
configurePolicy(context, configuredPolicy);
server = new Server();
final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
@ -292,7 +284,7 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
final LoginService loginService;
final String loginServiceValue = context.getProperty(LOGIN_SERVICE).getValue();
if (LOGIN_SERVICE_HASH.equals(loginServiceValue)) {
if (LOGIN_SERVICE_HASH.getValue().equals(loginServiceValue)) {
final String usersFilePath = context.getProperty(USERS_PROPERTIES_FILE).evaluateAttributeExpressions().getValue();
loginService = new HashLoginService("HashLoginService", usersFilePath);
} else {
@ -303,23 +295,22 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
securityHandler.setLoginService(loginService);
}
servletHandler = new ServletHandler();
ServletHandler servletHandler = new ServletHandler();
JettyWebSocketServletContainerInitializer.configure(contextHandler, null);
contextHandler.insertHandler(servletHandler);
handlerCollection.setHandlers(new Handler[]{contextHandler});
server.setHandler(handlerCollection);
listenPort = context.getProperty(LISTEN_PORT).evaluateAttributeExpressions().asInteger();
final SslContextFactory sslContextFactory = createSslFactory(context);
final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort);
final ServerConnector serverConnector = getServerConnector(context);
server.setConnectors(new Connector[] {serverConnector});
servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*");
final StandardJettyWebSocketServlet webSocketServlet = new StandardJettyWebSocketServlet(context);
servletHandler.addServletWithMapping(new ServletHolder(webSocketServlet), "/*");
getLogger().info("Starting JettyWebSocketServer on port {}.", new Object[]{listenPort});
getLogger().info("Starting Jetty WebSocket Server on Port {}", listenPort);
server.start();
listenPort = serverConnector.getLocalPort();
@ -330,44 +321,30 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
return listenPort;
}
private ServerConnector createConnector(final SslContextFactory sslContextFactory, final Integer listenPort) {
private ServerConnector getServerConnector(final ConfigurationContext context) {
final StandardServerConnectorFactory serverConnectorFactory = new StandardServerConnectorFactory(server, listenPort);
final ServerConnector serverConnector;
if (sslContextFactory == null) {
serverConnector = new ServerConnector(server);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
if (sslContextService == null) {
serverConnector = serverConnectorFactory.getServerConnector();
} else {
final HttpConfiguration httpsConfiguration = new HttpConfiguration();
httpsConfiguration.setSecureScheme("https");
httpsConfiguration.addCustomizer(new SecureRequestCustomizer());
serverConnector = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(httpsConfiguration));
final SSLContext sslContext = sslContextService.createContext();
serverConnectorFactory.setSslContext(sslContext);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
if (CLIENT_NEED.getValue().equals(clientAuthValue)) {
serverConnectorFactory.setNeedClientAuth(true);
} else if (CLIENT_WANT.getValue().equals(clientAuthValue)) {
serverConnectorFactory.setWantClientAuth(true);
}
serverConnector = serverConnectorFactory.getServerConnector();
}
serverConnector.setPort(listenPort);
return serverConnector;
}
private SslContextFactory createSslFactory(final ConfigurationContext context) {
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (CLIENT_NEED.equals(clientAuthValue)) {
need = true;
want = false;
} else if (CLIENT_WANT.equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
final SslContextFactory sslFactory = (sslService == null) ? null : createSslFactory(sslService, need, want, null);
return sslFactory;
}
@OnDisabled
@OnShutdown
@Override
@ -376,12 +353,11 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
return;
}
getLogger().info("Stopping JettyWebSocketServer.");
getLogger().info("Stopping Jetty WebSocket Server");
server.stop();
if (portToControllerService.containsKey(listenPort)
&& this.getIdentifier().equals(portToControllerService.get(listenPort).getIdentifier())) {
portToControllerService.remove(listenPort);
}
}
}

View File

@ -50,18 +50,18 @@ public class JettyWebSocketSession extends AbstractWebSocketSession {
}
@Override
public void close(final String reason) throws IOException {
public void close(final String reason) {
session.close(StatusCode.NORMAL, reason);
}
@Override
public InetSocketAddress getRemoteAddress() {
return session.getRemoteAddress();
return (InetSocketAddress) session.getRemoteAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
return session.getLocalAddress();
return (InetSocketAddress) session.getLocalAddress();
}
@Override

View File

@ -21,14 +21,11 @@ import org.apache.nifi.jetty.configuration.connector.ServerConnectorFactory;
import org.apache.nifi.registry.jetty.connector.ApplicationServerConnectorFactory;
import org.apache.nifi.registry.jetty.handler.HandlerProvider;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.eclipse.jetty.annotations.AnnotationConfiguration;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.webapp.Configuration;
import org.eclipse.jetty.webapp.JettyWebXmlConfiguration;
import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,10 +72,6 @@ public class JettyServer {
this.properties = properties;
this.server = new Server(threadPool);
// enable the annotation based configuration to ensure the jsp container is initialized properly
final Configuration.ClassList classlist = Configuration.ClassList.setServerDefault(server);
classlist.addBefore(JettyWebXmlConfiguration.class.getName(), AnnotationConfiguration.class.getName());
try {
configureConnectors();
final Handler handler = handlerProvider.getHandler(properties);

View File

@ -42,14 +42,6 @@
Default web.xml file.
This file is applied to a Web application before it's own WEB_INF/web.xml file
</description>
<!-- ==================================================================== -->
<!-- Removes static references to beans from javax.el.BeanELResolver to -->
<!-- ensure webapp classloader can be released on undeploy -->
<!-- ==================================================================== -->
<listener>
<listener-class>org.eclipse.jetty.servlet.listener.ELContextCleaner</listener-class>
</listener>
<!-- ==================================================================== -->
<!-- Removes static cache of Methods from java.beans.Introspector to -->

View File

@ -81,7 +81,7 @@ public class JettyITServerCustomizer implements WebServerFactoryCustomizer<Jetty
LOGGER.info("JettyServer is customized");
}
private SslContextFactory createSslContextFactory(Ssl properties) {
private SslContextFactory.Server createSslContextFactory(Ssl properties) {
// Calling SslContextFactory.Server() calls setEndpointIdentificationAlgorithm(null).
// This ensures that Jetty server does not attempt to validate a hostname in the client certificate's SAN.
final SslContextFactory.Server contextFactory = new SslContextFactory.Server();

View File

@ -62,7 +62,7 @@ public class TlsCertificateAuthorityService {
}
private static ServerConnector createSSLConnector(Server server, int port, KeyStore keyStore, String keyPassword) {
SslContextFactory sslContextFactory = new SslContextFactory.Server();
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setIncludeProtocols(TlsPlatform.getLatestProtocol());
sslContextFactory.setKeyStore(keyStore);
sslContextFactory.setKeyManagerPassword(keyPassword);

View File

@ -122,7 +122,7 @@
<org.slf4j.version>2.0.7</org.slf4j.version>
<derby.version>10.16.1.1</derby.version>
<ranger.version>2.4.0</ranger.version>
<jetty.version>9.4.51.v20230217</jetty.version>
<jetty.version>10.0.15</jetty.version>
<jackson.bom.version>2.15.2</jackson.bom.version>
<avro.version>1.11.2</avro.version>
<jaxb.runtime.version>2.3.5</jaxb.runtime.version>
@ -454,7 +454,6 @@
<version>${jetty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-client</artifactId>
@ -465,11 +464,6 @@
<artifactId>jetty-alpn-java-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>