diff --git a/NOTICE b/NOTICE
index a4f0bb731f..e0bb225a7a 100644
--- a/NOTICE
+++ b/NOTICE
@@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http
org/apache/storm/hive/common/HiveOptions.java
and can be found in the org.apache.nifi.util.hive package
+This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
+ Copyright 2014 The Apache Software Foundation
+ The derived work is adapted from
+ org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
+ and can be found in the org.apache.nifi.hadoop package
+
This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive):
Copyright 2008-2016 The Apache Software Foundation
The derived work is adapted from
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index e27e23467f..9f413d6fb5 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http
org/apache/storm/hive/common/HiveOptions.java
and can be found in the org.apache.nifi.util.hive package
+This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
+ Copyright 2014 The Apache Software Foundation
+ The derived work is adapted from
+ org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
+ and can be found in the org.apache.nifi.hadoop package
+
This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive):
Copyright 2008-2016 The Apache Software Foundation
The derived work is adapted from
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
index 12da3f7d69..7d9b681b67 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
@@ -52,6 +52,18 @@
${hadoop.version}
provided
+
+ org.apache.hadoop
+ hadoop-auth
+ ${hadoop.version}
+ provided
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.5
+ provided
+
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java
new file mode 100644
index 0000000000..ade65fedd3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Modified Kerberos configuration class from {@link org.apache.hadoop.security.authentication.client.KerberosAuthenticator.KerberosConfiguration}
+ * that requires authentication from a keytab.
+ */
+public class KerberosConfiguration extends javax.security.auth.login.Configuration {
+
+ private static final Map USER_KERBEROS_OPTIONS = new HashMap<>();
+ private static final AppConfigurationEntry USER_KERBEROS_LOGIN;
+ private static final AppConfigurationEntry[] USER_KERBEROS_CONF;
+
+ KerberosConfiguration(String principal, String keytab) {
+ USER_KERBEROS_OPTIONS.put("principal", principal);
+ USER_KERBEROS_OPTIONS.put("keyTab", keytab);
+ }
+
+ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+ return USER_KERBEROS_CONF;
+ }
+
+ static {
+ USER_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+ USER_KERBEROS_OPTIONS.put("useKeyTab", "true");
+ USER_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+ USER_KERBEROS_LOGIN = new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, USER_KERBEROS_OPTIONS);
+ USER_KERBEROS_CONF = new AppConfigurationEntry[]{USER_KERBEROS_LOGIN};
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java
new file mode 100644
index 0000000000..24a43542c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabCredentials.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.http.auth.Credentials;
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import java.security.Principal;
+
+/**
+ * Crendentials that incorporate a user principal and a keytab file.
+ */
+public class KerberosKeytabCredentials implements Credentials {
+
+ private final KerberosPrincipal userPrincipal;
+ private final String keytab;
+
+ public KerberosKeytabCredentials(String principalName, String keytab) {
+ this.userPrincipal = new KerberosPrincipal(principalName);
+ this.keytab = keytab;
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ return userPrincipal;
+ }
+
+ @Override
+ public String getPassword() {
+ return null;
+ }
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java
new file mode 100644
index 0000000000..295b765844
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoAuthSchemeProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.http.auth.AuthScheme;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * Provider class for KerberosKeytabSPNegoAuthScheme.
+ */
+public class KerberosKeytabSPNegoAuthSchemeProvider implements AuthSchemeProvider {
+
+ public AuthScheme create(HttpContext context) {
+ return new KerberosKeytabSPNegoScheme();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java
new file mode 100644
index 0000000000..fd7171c604
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosKeytabSPNegoScheme.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.http.auth.Credentials;
+import org.apache.http.impl.auth.SPNegoScheme;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.net.UnknownHostException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class provides a very similar authentication scheme and token generation as {@link SPNegoScheme} does.
+ * The token generation is based on a keytab file coming from {@link KerberosKeytabCredentials} and the process
+ * uses hadoop-auth tools.
+ */
+public class KerberosKeytabSPNegoScheme extends SPNegoScheme {
+
+ public KerberosKeytabSPNegoScheme() {
+ super(true, false);
+ }
+
+ @Override
+ public byte[] generateToken(byte[] input, String authServer, Credentials credentials) {
+ Set principals = new HashSet<>();
+ principals.add(credentials.getUserPrincipal());
+ Subject subject = new Subject(false, principals, new HashSet<>(), new HashSet<>());
+
+ try {
+ LoginContext loginContext = new LoginContext("", subject, null,
+ new KerberosConfiguration(credentials.getUserPrincipal().getName(),
+ ((KerberosKeytabCredentials) credentials).getKeytab()));
+ loginContext.login();
+ Subject loggedInSubject = loginContext.getSubject();
+
+ return Subject.doAs(loggedInSubject, new PrivilegedExceptionAction() {
+
+ public byte[] run() throws UnknownHostException, ClassNotFoundException, GSSException,
+ IllegalAccessException, NoSuchFieldException {
+ GSSManager gssManager = GSSManager.getInstance();
+ String servicePrincipal = KerberosUtil.getServicePrincipal("HTTP", authServer);
+ Oid serviceOid = KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL");
+ GSSName serviceName = gssManager.createName(servicePrincipal, serviceOid);
+ Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID");
+ GSSContext gssContext = gssManager.createContext(serviceName, mechOid, null, 0);
+ gssContext.requestCredDeleg(true);
+ gssContext.requestMutualAuth(true);
+ return gssContext.initSecContext(input, 0, input.length);
+ }
+
+ });
+ } catch (PrivilegedActionException | LoginException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..823ddc6428
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,11 @@
+nifi-hadoop-utils
+Copyright 2014-2018 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
+ Copyright 2014 The Apache Software Foundation
+ The derived work is adapted from
+ org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
+ and can be found in the org.apache.nifi.hadoop package
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
index 77650654bc..c954e2d185 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
@@ -28,5 +28,10 @@
nifi-api
provided
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.5
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
index 7627aa42e5..f75170f18b 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
@@ -17,18 +17,17 @@
package org.apache.nifi.controller.api.livy;
import java.io.IOException;
-import java.net.HttpURLConnection;
import java.util.Map;
+import org.apache.http.client.HttpClient;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
public interface LivySessionService extends ControllerService {
String APPLICATION_JSON = "application/json";
String USER = "nifi";
- String GET = "GET";
- String POST = "POST";
- Map getSession();
+ Map getSession() throws SessionManagerException;
- HttpURLConnection getConnection(String urlString) throws IOException;
+ HttpClient getConnection() throws IOException, SessionManagerException;
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java
new file mode 100644
index 0000000000..77b63f0d2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/exception/SessionManagerException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.api.livy.exception;
+
+public class SessionManagerException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public SessionManagerException(final Throwable t) {
+ super(t);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
index 84eedf92a7..56aa4f65c6 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
@@ -60,5 +60,29 @@
org.codehaus.jackson
jackson-mapper-asl
+
+ org.apache.nifi
+ nifi-kerberos-credentials-service-api
+
+
+ org.apache.nifi
+ nifi-hadoop-utils
+ 1.7.0-SNAPSHOT
+
+
+ org.apache.hadoop
+ hadoop-auth
+ 2.7.3
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
index f9ded28b6e..77b146a1c2 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
@@ -21,11 +21,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStream;
import java.net.ConnectException;
-import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
-import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStore;
@@ -45,6 +42,22 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -53,8 +66,11 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
+import org.apache.nifi.hadoop.KerberosKeytabCredentials;
+import org.apache.nifi.hadoop.KerberosKeytabSPNegoAuthSchemeProvider;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.codehaus.jackson.map.ObjectMapper;
@@ -64,10 +80,8 @@ import org.codehaus.jettison.json.JSONObject;
import org.apache.nifi.controller.api.livy.LivySessionService;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
@Tags({"Livy", "REST", "Spark", "http"})
@@ -157,6 +171,14 @@ public class LivySessionController extends AbstractControllerService implements
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
+ static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .required(false)
+ .build();
+
private volatile String livyUrl;
private volatile int sessionPoolSize;
private volatile String controllerKind;
@@ -168,6 +190,8 @@ public class LivySessionController extends AbstractControllerService implements
private volatile int connectTimeout;
private volatile Thread livySessionManagerThread = null;
private volatile boolean enabled = true;
+ private volatile KerberosCredentialsService credentialsService;
+ private volatile SessionManagerException sessionManagerException;
private List properties;
@@ -183,6 +207,7 @@ public class LivySessionController extends AbstractControllerService implements
props.add(CONNECT_TIMEOUT);
props.add(JARS);
props.add(FILES);
+ props.add(KERBEROS_CREDENTIALS_SERVICE);
properties = Collections.unmodifiableList(props);
}
@@ -204,6 +229,7 @@ public class LivySessionController extends AbstractControllerService implements
sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
connectTimeout = Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
+ credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
this.livyUrl = "http" + (sslContextService != null ? "s" : "") + "://" + livyHost + ":" + livyPort;
this.controllerKind = sessionKind;
@@ -216,12 +242,16 @@ public class LivySessionController extends AbstractControllerService implements
while (enabled) {
try {
manageSessions();
+ sessionManagerException = null;
+ } catch (Exception e) {
+ getLogger().error("Livy Session Manager Thread run into an error, but continues to run", e);
+ sessionManagerException = new SessionManagerException(e);
+ }
+ try {
Thread.sleep(sessionManagerStatusInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
enabled = false;
- } catch (IOException ioe) {
- throw new ProcessException(ioe);
}
}
});
@@ -243,7 +273,9 @@ public class LivySessionController extends AbstractControllerService implements
}
@Override
- public Map getSession() {
+ public Map getSession() throws SessionManagerException {
+ checkSessionManagerException();
+
Map sessionMap = new HashMap<>();
try {
final Map sessionsCopy = sessions;
@@ -254,6 +286,7 @@ public class LivySessionController extends AbstractControllerService implements
if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) {
sessionMap.put("sessionId", String.valueOf(sessionId));
sessionMap.put("livyUrl", livyUrl);
+ break;
}
}
} catch (JSONException e) {
@@ -263,18 +296,41 @@ public class LivySessionController extends AbstractControllerService implements
}
@Override
- public HttpURLConnection getConnection(String urlString) throws IOException {
- URL url = new URL(urlString);
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setConnectTimeout(connectTimeout);
+ public HttpClient getConnection() throws IOException, SessionManagerException {
+ checkSessionManagerException();
+
+ return openConnection();
+ }
+
+ private HttpClient openConnection() throws IOException {
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+
if (sslContextService != null) {
try {
- setSslSocketFactory((HttpsURLConnection) connection, sslContextService, sslContext);
+ SSLContext sslContext = getSslSocketFactory(sslContextService);
+ httpClientBuilder.setSSLContext(sslContext);
} catch (KeyStoreException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException e) {
throw new IOException(e);
}
}
- return connection;
+
+ if (credentialsService != null) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(new AuthScope(null, -1, null),
+ new KerberosKeytabCredentials(credentialsService.getPrincipal(), credentialsService.getKeytab()));
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ Lookup authSchemeRegistry = RegistryBuilder. create()
+ .register(AuthSchemes.SPNEGO, new KerberosKeytabSPNegoAuthSchemeProvider()).build();
+ httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
+ }
+
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ requestConfigBuilder.setConnectTimeout(connectTimeout);
+ requestConfigBuilder.setConnectionRequestTimeout(connectTimeout);
+ requestConfigBuilder.setSocketTimeout(connectTimeout);
+ httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
+
+ return httpClientBuilder.build();
}
private void manageSessions() throws InterruptedException, IOException {
@@ -439,36 +495,34 @@ public class LivySessionController extends AbstractControllerService implements
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, Map headers, String payload) throws IOException, JSONException {
- HttpURLConnection connection = getConnection(urlString);
-
- connection.setRequestMethod(POST);
- connection.setDoOutput(true);
+ HttpClient httpClient = openConnection();
+ HttpPost request = new HttpPost(urlString);
for (Map.Entry entry : headers.entrySet()) {
- connection.setRequestProperty(entry.getKey(), entry.getValue());
+ request.addHeader(entry.getKey(), entry.getValue());
+ }
+ HttpEntity httpEntity = new StringEntity(payload);
+ request.setEntity(httpEntity);
+ HttpResponse response = httpClient.execute(request);
+
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) {
+ throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase());
}
- OutputStream os = connection.getOutputStream();
- os.write(payload.getBytes());
- os.flush();
-
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
- throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
- }
-
- InputStream content = connection.getInputStream();
+ InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
private JSONObject readJSONFromUrl(String urlString, Map headers) throws IOException, JSONException {
+ HttpClient httpClient = openConnection();
- HttpURLConnection connection = getConnection(urlString);
+ HttpGet request = new HttpGet(urlString);
for (Map.Entry entry : headers.entrySet()) {
- connection.setRequestProperty(entry.getKey(), entry.getValue());
+ request.addHeader(entry.getKey(), entry.getValue());
}
- connection.setRequestMethod(GET);
- connection.setDoOutput(true);
- InputStream content = connection.getInputStream();
+ HttpResponse response = httpClient.execute(request);
+
+ InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
@@ -478,7 +532,7 @@ public class LivySessionController extends AbstractControllerService implements
return new JSONObject(jsonText);
}
- private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext)
+ private SSLContext getSslSocketFactory(SSLContextService sslService)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
final String keystoreLocation = sslService.getKeyStoreFile();
final String keystorePass = sslService.getKeyStorePassword();
@@ -506,7 +560,14 @@ public class LivySessionController extends AbstractControllerService implements
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
- final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
- httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
+ return sslContext;
}
+
+ private void checkSessionManagerException() throws SessionManagerException {
+ SessionManagerException exception = sessionManagerException;
+ if (exception != null) {
+ throw sessionManagerException;
+ }
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
index 7b021c3352..56e115d3a0 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
@@ -86,5 +86,10 @@
jetty-server
test
+
+ org.apache.nifi
+ nifi-kerberos-credentials-service-api
+ test
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
index dcb6b8249c..4a878429fe 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
@@ -20,8 +20,6 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -37,10 +35,18 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@@ -162,10 +168,19 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
final ComponentLog log = getLogger();
final LivySessionService livySessionService = context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class);
- final Map livyController = livySessionService.getSession();
- if (livyController == null || livyController.isEmpty()) {
- log.debug("No Spark session available (yet), routing flowfile to wait");
+ final Map livyController;
+ try {
+ livyController = livySessionService.getSession();
+ if (livyController == null || livyController.isEmpty()) {
+ log.debug("No Spark session available (yet), routing flowfile to wait");
+ session.transfer(flowFile, REL_WAIT);
+ context.yield();
+ return;
+ }
+ } catch (SessionManagerException sme) {
+ log.error("Error opening spark session, routing flowfile to wait", sme);
session.transfer(flowFile, REL_WAIT);
+ context.yield();
return;
}
final long statusCheckInterval = context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -214,14 +229,15 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
- } catch (IOException ioe) {
- log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
+ } catch (IOException | SessionManagerException e) {
+ log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, e.getMessage()}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
- private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) throws IOException {
+ private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval)
+ throws IOException, SessionManagerException {
ComponentLog log = getLogger();
String statementUrl = livyUrl + "/sessions/" + sessionId + "/statements";
JSONObject output = null;
@@ -265,42 +281,42 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, LivySessionService livySessionService, Map headers, String payload)
- throws IOException, JSONException {
-
- HttpURLConnection connection = livySessionService.getConnection(urlString);
- connection.setRequestMethod("POST");
- connection.setDoOutput(true);
+ throws IOException, JSONException, SessionManagerException {
+ HttpClient httpClient = livySessionService.getConnection();
+ HttpPost request = new HttpPost(urlString);
for (Map.Entry entry : headers.entrySet()) {
- connection.setRequestProperty(entry.getKey(), entry.getValue());
+ request.addHeader(entry.getKey(), entry.getValue());
+ }
+ HttpEntity httpEntity = new StringEntity(payload);
+ request.setEntity(httpEntity);
+ HttpResponse response = httpClient.execute(request);
+
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) {
+ throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase());
}
- OutputStream os = connection.getOutputStream();
- os.write(payload.getBytes());
- os.flush();
+ InputStream content = response.getEntity().getContent();
+ return readAllIntoJSONObject(content);
+ }
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
- throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
+ private JSONObject readJSONObjectFromUrl(String urlString, LivySessionService livySessionService, Map headers) throws IOException, JSONException, SessionManagerException {
+ HttpClient httpClient = livySessionService.getConnection();
+
+ HttpGet request = new HttpGet(urlString);
+ for (Map.Entry entry : headers.entrySet()) {
+ request.addHeader(entry.getKey(), entry.getValue());
}
+ HttpResponse response = httpClient.execute(request);
- InputStream content = connection.getInputStream();
+ InputStream content = response.getEntity().getContent();
+ return readAllIntoJSONObject(content);
+ }
+
+ private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
- private JSONObject readJSONObjectFromUrl(final String urlString, LivySessionService livySessionService, final Map headers)
- throws IOException, JSONException {
-
- HttpURLConnection connection = livySessionService.getConnection(urlString);
- for (Map.Entry entry : headers.entrySet()) {
- connection.setRequestProperty(entry.getKey(), entry.getValue());
- }
- connection.setRequestMethod("GET");
- connection.setDoOutput(true);
- InputStream content = connection.getInputStream();
- BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
- String jsonText = IOUtils.toString(rd);
- return new JSONObject(jsonText);
- }
}