NIFI-4683: Add ability to execute Spark jobs via Livy

This commit is contained in:
Matthew Burgess 2017-12-13 21:43:35 -05:00 committed by joewitt
parent d543cfde25
commit 2192138b06
24 changed files with 2318 additions and 1 deletions

View File

@ -541,6 +541,18 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-metrics-reporting-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
<profiles>
<profile>

View File

@ -0,0 +1,39 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-livy-controller-service-api-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,209 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -0,0 +1,47 @@
nifi-livy-controller-service-api-nar
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation

View File

@ -0,0 +1,33 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-livy-controller-service-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.api.livy;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
public interface LivySessionService extends ControllerService {
String APPLICATION_JSON = "application/json";
String USER = "nifi";
String GET = "GET";
String POST = "POST";
Map<String, String> getSession();
HttpURLConnection getConnection(String urlString) throws IOException;
}

View File

@ -0,0 +1,63 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-livy-controller-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,554 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.livy;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.apache.nifi.controller.api.livy.LivySessionService;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
@Tags({"Livy", "REST", "Spark", "http"})
@CapabilityDescription("Manages pool of Spark sessions over HTTP")
public class LivySessionController extends AbstractControllerService implements LivySessionService {
public static final PropertyDescriptor LIVY_HOST = new PropertyDescriptor.Builder()
.name("livy-cs-livy-host")
.displayName("Livy Host")
.description("The hostname (or IP address) of the Livy server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor LIVY_PORT = new PropertyDescriptor.Builder()
.name("livy-cs-livy-port")
.displayName("Livy Port")
.description("The port number for the Livy server.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("8998")
.build();
public static final PropertyDescriptor SESSION_POOL_SIZE = new PropertyDescriptor.Builder()
.name("livy-cs-session-pool-size")
.displayName("Session Pool Size")
.description("Number of sessions to keep open")
.required(true)
.defaultValue("2")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SESSION_TYPE = new PropertyDescriptor.Builder()
.name("livy-cs-session-kind")
.displayName("Session Type")
.description("The type of Spark session to start (spark, pyspark, pyspark3, sparkr, e.g.)")
.required(true)
.allowableValues("spark", "pyspark", "pyspark3", "sparkr")
.defaultValue("spark")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SESSION_MGR_STATUS_INTERVAL = new PropertyDescriptor.Builder()
.name("livy-cs-session-manager-status-interval")
.displayName("Session Manager Status Interval")
.description("The amount of time to wait between requesting session information updates.")
.required(true)
.defaultValue("2 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor JARS = new PropertyDescriptor.Builder()
.name("livy-cs-session-jars")
.displayName("Session JARs")
.description("JARs to be used in the Spark session.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.FILE_EXISTS_VALIDATOR))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor FILES = new PropertyDescriptor.Builder()
.name("livy-cs-session-files")
.displayName("Session Files")
.description("Files to be used in the Spark session.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.FILE_EXISTS_VALIDATOR))
.defaultValue(null)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout")
.description("Max wait time for connection to remote service.")
.required(true)
.defaultValue("5 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private volatile String livyUrl;
private volatile int sessionPoolSize;
private volatile String controllerKind;
private volatile String jars;
private volatile String files;
private volatile Map<Integer, JSONObject> sessions = new ConcurrentHashMap<>();
private volatile SSLContextService sslContextService;
private volatile SSLContext sslContext;
private volatile int connectTimeout;
private volatile Thread livySessionManagerThread = null;
private volatile boolean enabled = true;
private List<PropertyDescriptor> properties;
@Override
protected void init(ControllerServiceInitializationContext config) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(LIVY_HOST);
props.add(LIVY_PORT);
props.add(SESSION_POOL_SIZE);
props.add(SESSION_TYPE);
props.add(SESSION_MGR_STATUS_INTERVAL);
props.add(SSL_CONTEXT_SERVICE);
props.add(CONNECT_TIMEOUT);
props.add(JARS);
props.add(FILES);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
ComponentLog log = getLogger();
log.info("********** Starting Livy Session Controller Service...");
final String livyHost = context.getProperty(LIVY_HOST).evaluateAttributeExpressions().getValue();
final String livyPort = context.getProperty(LIVY_PORT).evaluateAttributeExpressions().getValue();
final String sessionPoolSize = context.getProperty(SESSION_POOL_SIZE).evaluateAttributeExpressions().getValue();
final String sessionKind = context.getProperty(SESSION_TYPE).getValue();
final long sessionManagerStatusInterval = context.getProperty(SESSION_MGR_STATUS_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
final String jars = context.getProperty(JARS).evaluateAttributeExpressions().getValue();
final String files = context.getProperty(FILES).evaluateAttributeExpressions().getValue();
sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
connectTimeout = Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
this.livyUrl = "http" + (sslContextService != null ? "s" : "") + "://" + livyHost + ":" + livyPort;
this.controllerKind = sessionKind;
this.jars = jars;
this.files = files;
this.sessionPoolSize = Integer.valueOf(sessionPoolSize);
this.enabled = true;
livySessionManagerThread = new Thread(() -> {
while (enabled) {
try {
manageSessions();
Thread.sleep(sessionManagerStatusInterval);
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("********** " + Thread.currentThread().getName()
+ " run() Interrupt Status: " + Thread.currentThread().isInterrupted());
enabled = false;
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
}
});
livySessionManagerThread.setName("Livy-Session-Manager-" + controllerKind);
livySessionManagerThread.start();
}
@OnDisabled
public void shutdown() {
ComponentLog log = getLogger();
try {
log.info("********** Starting Livy Session Controller Service...");
enabled = false;
livySessionManagerThread.interrupt();
livySessionManagerThread.join();
} catch (InterruptedException e) {
log.error("Livy Session Manager Thread interrupted");
}
}
public Map<String, String> getSession() {
Map<String, String> sessionMap = new HashMap<>();
try {
final Map<Integer, JSONObject> sessionsCopy = sessions;
for (int sessionId : sessionsCopy.keySet()) {
JSONObject currentSession = sessions.get(sessionId);
String state = currentSession.getString("state");
String sessionKind = currentSession.getString("kind");
if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) {
sessionMap.put("sessionId", String.valueOf(sessionId));
sessionMap.put("livyUrl", livyUrl);
}
}
} catch (JSONException e) {
getLogger().error("Unexpected data found when looking for JSON object with 'state' and 'kind' fields", e);
}
return sessionMap;
}
public HttpURLConnection getConnection(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(connectTimeout);
if (sslContextService != null) {
try {
setSslSocketFactory((HttpsURLConnection) connection, sslContextService, sslContext);
} catch (KeyStoreException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException e) {
throw new IOException(e);
}
}
return connection;
}
private void manageSessions() throws InterruptedException, IOException {
int idleSessions = 0;
JSONObject newSessionInfo;
Map<Integer, JSONObject> sessionsInfo;
ComponentLog log = getLogger();
try {
sessionsInfo = listSessions();
if (sessions.isEmpty()) {
log.debug("********** manageSessions() the active session list is empty, populating from acquired list...");
sessions.putAll(sessionsInfo);
}
for (Integer sessionId : new ArrayList<>(sessions.keySet())) {
JSONObject currentSession = sessions.get(sessionId);
log.debug("********** manageSessions() Updating current session: " + currentSession);
if (sessionsInfo.containsKey(sessionId)) {
String state = currentSession.getString("state");
String sessionKind = currentSession.getString("kind");
log.debug("********** manageSessions() controller kind: {}, session kind: {}, session state: {}",
new Object[]{controllerKind, sessionKind, state});
if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) {
// Keep track of how many sessions are in an idle state and thus available
idleSessions++;
sessions.put(sessionId, sessionsInfo.get(sessionId));
// Remove session from session list source of truth snapshot since it has been dealt with
sessionsInfo.remove(sessionId);
} else if ((state.equalsIgnoreCase("busy") || state.equalsIgnoreCase("starting")) && sessionKind.equalsIgnoreCase(controllerKind)) {
// Update status of existing sessions
sessions.put(sessionId, sessionsInfo.get(sessionId));
// Remove session from session list source of truth snapshot since it has been dealt with
sessionsInfo.remove(sessionId);
} else {
// Prune sessions of kind != controllerKind and whose state is:
// not_started, shutting_down, error, dead, success (successfully stopped)
sessions.remove(sessionId);
//Remove session from session list source of truth snapshot since it has been dealt with
sessionsInfo.remove(sessionId);
}
} else {
// Prune sessions that no longer exist
log.debug("********** manageSessions() session exists in session pool but not in source snapshot, removing from pool...");
sessions.remove(sessionId);
// Remove session from session list source of truth snapshot since it has been dealt with
sessionsInfo.remove(sessionId);
}
}
int numSessions = sessions.size();
log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool");
// Open new sessions equal to the number requested by sessionPoolSize
if (numSessions == 0) {
for (int i = 0; i < sessionPoolSize; i++) {
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
}
} else {
// Open one new session if there are no idle sessions
if (idleSessions == 0) {
log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating...");
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
}
// Open more sessions if number of sessions is less than target pool size
if (numSessions < sessionPoolSize) {
log.debug("********** manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + sessionPoolSize + ", creating...");
for (int i = 0; i < sessionPoolSize - numSessions; i++) {
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
}
}
}
} catch (ConnectException | SocketTimeoutException ce) {
log.error("Timeout connecting to Livy service to retrieve sessions", ce);
} catch (JSONException e) {
throw new IOException(e);
}
}
private Map<Integer, JSONObject> listSessions() throws IOException {
String sessionsUrl = livyUrl + "/sessions";
int numSessions;
JSONObject sessionsInfo;
Map<Integer, JSONObject> sessionsMap = new HashMap<>();
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", APPLICATION_JSON);
headers.put("X-Requested-By", USER);
try {
sessionsInfo = readJSONFromUrl(sessionsUrl, headers);
numSessions = sessionsInfo.getJSONArray("sessions").length();
for (int i = 0; i < numSessions; i++) {
int currentSessionId = sessionsInfo.getJSONArray("sessions").getJSONObject(i).getInt("id");
JSONObject currentSession = sessionsInfo.getJSONArray("sessions").getJSONObject(i);
sessionsMap.put(currentSessionId, currentSession);
}
} catch (JSONException e) {
throw new IOException(e);
}
return sessionsMap;
}
private JSONObject getSessionInfo(int sessionId) throws IOException {
String sessionUrl = livyUrl + "/sessions/" + sessionId;
JSONObject sessionInfo;
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", APPLICATION_JSON);
headers.put("X-Requested-By", USER);
try {
sessionInfo = readJSONFromUrl(sessionUrl, headers);
} catch (JSONException e) {
throw new IOException(e);
}
return sessionInfo;
}
private JSONObject openSession() throws IOException, JSONException, InterruptedException {
ComponentLog log = getLogger();
JSONObject newSessionInfo;
final ObjectMapper mapper = new ObjectMapper();
String sessionsUrl = livyUrl + "/sessions";
StringBuilder payload = new StringBuilder("{\"kind\":\"" + controllerKind + "\"");
if (jars != null) {
List<String> jarsArray = Arrays.stream(jars.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim).collect(Collectors.toList());
String jarsJsonArray = mapper.writeValueAsString(jarsArray);
payload.append(",\"jars\":");
payload.append(jarsJsonArray);
}
if (files != null) {
List<String> filesArray = Arrays.stream(files.split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim).collect(Collectors.toList());
String filesJsonArray = mapper.writeValueAsString(filesArray);
payload.append(",\"files\":");
payload.append(filesJsonArray);
}
payload.append("}");
log.debug("********** openSession() Session Payload: " + payload.toString());
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", APPLICATION_JSON);
headers.put("X-Requested-By", USER);
newSessionInfo = readJSONObjectFromUrlPOST(sessionsUrl, headers, payload.toString());
Thread.sleep(1000);
while (newSessionInfo.getString("state").equalsIgnoreCase("starting")) {
log.debug("********** openSession() Waiting for session to start...");
newSessionInfo = getSessionInfo(newSessionInfo.getInt("id"));
log.debug("********** openSession() newSessionInfo: " + newSessionInfo);
Thread.sleep(1000);
}
return newSessionInfo;
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException {
URL url = new URL(urlString);
HttpURLConnection connection = getConnection(urlString);
connection.setRequestMethod(POST);
connection.setDoOutput(true);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
}
InputStream content = connection.getInputStream();
return readAllIntoJSONObject(content);
}
private JSONArray readJSONArrayFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException {
URL url = new URL(urlString);
HttpURLConnection connection = getConnection(urlString);
connection.setRequestMethod(POST);
connection.setDoOutput(true);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
}
InputStream content = connection.getInputStream();
return readAllIntoJSONArray(content);
}
private JSONObject readJSONFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
HttpURLConnection connection = getConnection(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setRequestMethod(GET);
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
return readAllIntoJSONObject(content);
}
private JSONArray readJSONArrayFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
HttpURLConnection connection = getConnection(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setRequestMethod(GET);
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
return readAllIntoJSONArray(content);
}
private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
private JSONArray readAllIntoJSONArray(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONArray(jsonText);
}
private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
final String keystoreLocation = sslService.getKeyStoreFile();
final String keystorePass = sslService.getKeyStorePassword();
final String keystoreType = sslService.getKeyStoreType();
// prepare the keystore
final KeyStore keyStore = KeyStore.getInstance(keystoreType);
try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
keyStore.load(keyStoreStream, keystorePass.toCharArray());
}
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keystorePass.toCharArray());
// load truststore
final String truststoreLocation = sslService.getTrustStoreFile();
final String truststorePass = sslService.getTrustStorePassword();
final String truststoreType = sslService.getTrustStoreType();
KeyStore truststore = KeyStore.getInstance(truststoreType);
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
trustManagerFactory.init(truststore);
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.livy.LivySessionController

View File

@ -0,0 +1,48 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-livy-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-processors</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,209 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -0,0 +1,55 @@
nifi-livy-nar
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
Copyright 2001-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation

View File

@ -0,0 +1,81 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-livy-processors</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service</artifactId>
<version>1.5.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.apache.nifi.controller.api.livy.LivySessionService;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"spark", "livy", "http", "execute"})
@CapabilityDescription("Execute Spark Code over a Livy-managed HTTP session to a live Spark context. Supports cached RDD sharing.")
public class ExecuteSparkInteractive extends AbstractProcessor {
public static final PropertyDescriptor LIVY_CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
.name("exec-spark-iactive-livy-controller-service")
.displayName("Livy Controller Service")
.description("The controller service to use for Livy-managed session(s).")
.required(true)
.identifiesControllerService(LivySessionService.class)
.build();
public static final PropertyDescriptor CODE = new PropertyDescriptor.Builder()
.name("exec-spark-iactive-code")
.displayName("Code")
.description("The code to execute in the session. This property can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain valid code to be issued by the processor to the session. Note that Expression "
+ "Language is not evaluated for flow file contents.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
/**
* Points to the charset name corresponding to the incoming flow file's encoding.
*/
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("exec-spark-iactive-charset")
.displayName("Character Set")
.description("The character set encoding for the incoming flow file.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor STATUS_CHECK_INTERVAL = new PropertyDescriptor.Builder()
.name("exec-spark-iactive-status-check-interval")
.displayName("Status Check Interval")
.description("The amount of time to wait between checking the status of an operation.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("1 sec")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are successfully processed are sent to this relationship")
.build();
public static final Relationship REL_WAIT = new Relationship.Builder()
.name("wait")
.description("FlowFiles that are waiting on an available Spark session will be sent to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to this relationship when they cannot be parsed")
.build();
private volatile List<PropertyDescriptor> properties;
private volatile Set<Relationship> relationships;
public void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LIVY_CONTROLLER_SERVICE);
properties.add(CODE);
properties.add(CHARSET);
properties.add(STATUS_CHECK_INTERVAL);
this.properties = Collections.unmodifiableList(properties);
Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_WAIT);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog log = getLogger();
final LivySessionService livySessionService = context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class);
final Map<String, String> livyController = livySessionService.getSession();
if (livyController == null || livyController.isEmpty()) {
log.debug("No Spark session available (yet), routing flowfile to wait");
session.transfer(flowFile, REL_WAIT);
return;
}
final long statusCheckInterval = context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
Charset charset;
try {
charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
} catch (Exception e) {
log.warn("Illegal character set name specified, defaulting to UTF-8");
charset = StandardCharsets.UTF_8;
}
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(code)) {
try (InputStream inputStream = session.read(flowFile)) {
// If no code was provided, assume it is in the content of the incoming flow file
code = IOUtils.toString(inputStream, charset);
} catch (IOException ioe) {
log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
code = StringEscapeUtils.escapeJavaScript(code);
String payload = "{\"code\":\"" + code + "\"}";
try {
final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
log.debug("********** ExecuteSparkInteractive Result of Job Submit: " + result);
if (result == null) {
session.transfer(flowFile, REL_FAILURE);
} else {
try {
final JSONObject output = result.getJSONObject("data");
flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes()));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
session.transfer(flowFile, REL_SUCCESS);
} catch (JSONException je) {
// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing)
log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes()));
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
} catch (IOException ioe) {
log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) throws IOException {
ComponentLog log = getLogger();
String statementUrl = livyUrl + "/sessions/" + sessionId + "/statements";
JSONObject output = null;
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
headers.put("X-Requested-By", LivySessionService.USER);
headers.put("Accept", "application/json");
log.debug("********** submitAndHandleJob() Submitting Job to Spark via: " + statementUrl);
try {
JSONObject jobInfo = readJSONObjectFromUrlPOST(statementUrl, livySessionService, headers, payload);
log.debug("********** submitAndHandleJob() Job Info: " + jobInfo);
String statementId = String.valueOf(jobInfo.getInt("id"));
statementUrl = statementUrl + "/" + statementId;
jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers);
String jobState = jobInfo.getString("state");
log.debug("********** submitAndHandleJob() New Job Info: " + jobInfo);
Thread.sleep(statusCheckInterval);
if (jobState.equalsIgnoreCase("available")) {
log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". returning output...");
output = jobInfo.getJSONObject("output");
} else if (jobState.equalsIgnoreCase("running") || jobState.equalsIgnoreCase("waiting")) {
while (!jobState.equalsIgnoreCase("available")) {
log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete...");
Thread.sleep(statusCheckInterval);
jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers);
jobState = jobInfo.getString("state");
}
output = jobInfo.getJSONObject("output");
} else if (jobState.equalsIgnoreCase("error")
|| jobState.equalsIgnoreCase("cancelled")
|| jobState.equalsIgnoreCase("cancelling")) {
log.debug("********** Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details.");
throw new IOException(jobState);
}
} catch (JSONException | InterruptedException e) {
throw new IOException(e);
}
return output;
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, LivySessionService livySessionService, Map<String, String> headers, String payload)
throws IOException, JSONException {
HttpURLConnection connection = livySessionService.getConnection(urlString);
connection.setRequestMethod("POST");
connection.setDoOutput(true);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
}
InputStream content = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
private JSONObject readJSONObjectFromUrl(final String urlString, LivySessionService livySessionService, final Map<String, String> headers)
throws IOException, JSONException {
HttpURLConnection connection = livySessionService.getConnection(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setRequestMethod("GET");
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.livy.ExecuteSparkInteractive

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
public class ExecuteSparkInteractiveTestBase {
public static class LivyAPIHandler extends AbstractHandler {
int session1Requests = 0;
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
baseRequest.setHandled(true);
response.setStatus(200);
if ("GET".equalsIgnoreCase(request.getMethod())) {
String responseBody = "{}";
response.setContentType("application/json");
if ("/sessions".equalsIgnoreCase(target)) {
responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}";
} else if (target.startsWith("/sessions/") && !target.contains("statement")) {
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
} else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) {
switch (session1Requests) {
case 0:
responseBody = "{\"state\": \"waiting\"}";
break;
case 1:
responseBody = "{\"state\": \"running\"}";
break;
case 2:
responseBody = "{\"state\": \"available\", \"output\": {\"data\": {\"text/plain\": \"Hello world\"}}}";
break;
default:
responseBody = "{\"state\": \"error\"}";
break;
}
session1Requests++;
}
response.setContentLength(responseBody.length());
try (PrintWriter writer = response.getWriter()) {
writer.print(responseBody);
writer.flush();
}
} else if ("POST".equalsIgnoreCase(request.getMethod())) {
String responseBody = "{}";
response.setContentType("application/json");
if ("/sessions".equalsIgnoreCase(target)) {
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
} else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
responseBody = "{\"id\": 7}";
}
response.setContentLength(responseBody.length());
try (PrintWriter writer = response.getWriter()) {
writer.print(responseBody);
writer.flush();
}
}
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import org.apache.nifi.controller.livy.LivySessionController;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Handler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase {
public static TestServer server;
public static String url;
public TestRunner runner;
@BeforeClass
public static void beforeClass() throws Exception {
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port
server = createServer();
server.startServer();
// this is the base url with the random port
url = server.getUrl();
}
public void addHandler(Handler handler) {
server.addHandler(handler);
}
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
}
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class);
LivySessionController livyControllerService = new LivySessionController();
runner.addControllerService("livyCS", livyControllerService);
runner.setProperty(livyControllerService, LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, url.lastIndexOf(":")));
runner.setProperty(livyControllerService, LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1));
runner.enableControllerService(livyControllerService);
runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, "livyCS");
server.clearHandlers();
}
@After
public void after() {
runner.shutdown();
}
private static TestServer createServer() throws IOException {
return new TestServer();
}
@Test
public void testSparkSession() throws Exception {
addHandler(new LivyAPIHandler());
runner.enqueue("print \"hello world\"");
runner.run();
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
while (!waitingFlowfiles.isEmpty()) {
Thread.sleep(1000);
runner.clearTransferState();
runner.enqueue("print \"hello world\"");
runner.run();
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
}
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import org.apache.nifi.controller.livy.LivySessionController;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Handler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase {
private static Map<String, String> sslProperties;
public static TestServer server;
public static String url;
public TestRunner runner;
@BeforeClass
public static void beforeClass() throws Exception {
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create the SSL properties, which basically store keystore / trustore information
// this is used by the StandardSSLContextService and the Jetty Server
sslProperties = createSslProperties();
// create a Jetty server on a random port
server = createServer();
server.startServer();
// Allow time for the server to start
Thread.sleep(1000);
// this is the base url with the random port
url = server.getSecureUrl();
}
public void addHandler(Handler handler) {
server.addHandler(handler);
}
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
}
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class);
final StandardSSLContextService sslService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
// Allow time for the controller service to fully initialize
Thread.sleep(500);
LivySessionController livyControllerService = new LivySessionController();
runner.addControllerService("livyCS", livyControllerService);
runner.setProperty(livyControllerService, LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, url.lastIndexOf(":")));
runner.setProperty(livyControllerService, LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1));
runner.setProperty(livyControllerService, LivySessionController.SSL_CONTEXT_SERVICE, "ssl-context");
runner.enableControllerService(livyControllerService);
runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, "livyCS");
server.clearHandlers();
}
@After
public void after() {
runner.shutdown();
}
private static TestServer createServer() throws IOException {
return new TestServer(sslProperties);
}
@Test
public void testSslSparkSession() throws Exception {
addHandler(new LivyAPIHandler());
runner.enqueue("print \"hello world\"");
runner.run();
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
while (!waitingFlowfiles.isEmpty()) {
Thread.sleep(1000);
runner.clearTransferState();
runner.enqueue("print \"hello world\"");
runner.run();
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
}
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
}
private static Map<String, String> createSslProperties() {
final Map<String, String> map = new HashMap<>();
map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
return map;
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import java.util.Map;
/**
* Test server to assist with unit tests that requires a server to be stood up.
*/
/**
* Test server to assist with unit tests that requires a server to be stood up.
*/
public class TestServer {
public static final String NEED_CLIENT_AUTH = "clientAuth";
private Server jetty;
private boolean secure = false;
/**
* Creates the test server.
*/
public TestServer() {
createServer(null);
}
/**
* Creates the test server.
*
* @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
*/
public TestServer(final Map<String, String> sslProperties) {
createServer(sslProperties);
}
private void createServer(final Map<String, String> sslProperties) {
jetty = new Server();
// create the unsecure connector
createConnector();
// create the secure connector if sslProperties are specified
if (sslProperties != null) {
createSecureConnector(sslProperties);
}
jetty.setHandler(new HandlerCollection(true));
}
/**
* Creates the http connection
*/
private void createConnector() {
final ServerConnector http = new ServerConnector(jetty);
http.setPort(0);
// Severely taxed environments may have significant delays when executing.
http.setIdleTimeout(30000L);
jetty.addConnector(http);
}
private void createSecureConnector(final Map<String, String> sslProperties) {
SslContextFactory ssl = new SslContextFactory();
if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
}
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
ssl.setNeedClientAuth(true);
} else {
ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
}
// build the connector
final ServerConnector https = new ServerConnector(jetty, ssl);
// set host and port
https.setPort(0);
// Severely taxed environments may have significant delays when executing.
https.setIdleTimeout(30000L);
// add the connector
jetty.addConnector(https);
// mark secure as enabled
secure = true;
}
public void clearHandlers() {
HandlerCollection hc = (HandlerCollection) jetty.getHandler();
Handler[] ha = hc.getHandlers();
if (ha != null) {
for (Handler h : ha) {
hc.removeHandler(h);
}
}
}
public void addHandler(Handler handler) {
((HandlerCollection) jetty.getHandler()).addHandler(handler);
}
public void startServer() throws Exception {
jetty.start();
}
public void shutdownServer() throws Exception {
jetty.stop();
jetty.destroy();
}
private int getPort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
}
private int getSecurePort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
}
public String getUrl() {
return "http://localhost:" + getPort();
}
public String getSecureUrl() {
String url = null;
if (secure) {
url = "https://localhost:" + getSecurePort();
}
return url;
}
}

View File

@ -0,0 +1,85 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-spark-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-livy-nar</module>
<module>nifi-livy-controller-service-api-nar</module>
<module>nifi-livy-controller-service-api</module>
<module>nifi-livy-controller-service</module>
<module>nifi-livy-processors</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.8</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.19.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -89,6 +89,7 @@
<module>nifi-extension-utils</module>
<module>nifi-redis-bundle</module>
<module>nifi-metrics-reporting-bundle</module>
<module>nifi-spark-bundle</module>
</modules>
<build>

14
pom.xml
View File

@ -1462,7 +1462,19 @@
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-controller-service-api-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-livy-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
<version>1.5.0-SNAPSHOT</version>