NIFI-5041: Adds SPNEGO authentication to LivySessionController

NIFI-5041: fixes http client version issue

Change-Id: I1b87ec4752ff6e1603025883a72113919aba5dd4

NIFI-5041: fixes Kerberos configuration

Change-Id: I868fdf3ea7cfd28cf415164e420f23bf3f6eefeb

NIFI-5041: adds new NOTICE entries

NIFI-5041: yields processor if no session is available, fixes error handling in session manager thread, fixes error returned in KerberosKeytabSPNegoScheme on authentication failure

Change-Id: I443e063ae21c446980087e5464a4b70373d730f6

NIFI-5041: makes the session manager thread exceptions visible to the users

Change-Id: I33fde5df6933cec2a87a4d82e681d4464f21b459

NIFI-5041: adds special SessionManagerException to identify error occurred on session manager thread

Change-Id: I25a52c025376a0cd238f14bda533d6f5f3e5fb4a

This closes #2630

Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This commit is contained in:
Peter Toth 2018-04-10 14:27:51 +02:00 committed by Matthew Burgess
parent 83fccc4beb
commit a1794b101e
15 changed files with 466 additions and 76 deletions

6
NOTICE
View File

@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http
org/apache/storm/hive/common/HiveOptions.java
and can be found in the org.apache.nifi.util.hive package
This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
Copyright 2014 The Apache Software Foundation
The derived work is adapted from
org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
and can be found in the org.apache.nifi.hadoop package
This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive):
Copyright 2008-2016 The Apache Software Foundation
The derived work is adapted from

View File

@ -20,6 +20,12 @@ This includes derived works from the Apache Storm (ASLv2 licensed) project (http
org/apache/storm/hive/common/HiveOptions.java
and can be found in the org.apache.nifi.util.hive package
This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
Copyright 2014 The Apache Software Foundation
The derived work is adapted from
org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
and can be found in the org.apache.nifi.hadoop package
This includes derived works from the Apache Hive (ASLv2 licensed) project (https://github.com/apache/hive):
Copyright 2008-2016 The Apache Software Foundation
The derived work is adapted from

View File

@ -52,6 +52,18 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hadoop;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import javax.security.auth.login.AppConfigurationEntry;
import java.util.HashMap;
import java.util.Map;
/**
* Modified Kerberos configuration class from {@link org.apache.hadoop.security.authentication.client.KerberosAuthenticator.KerberosConfiguration}
* that requires authentication from a keytab.
*/
public class KerberosConfiguration extends javax.security.auth.login.Configuration {
private static final Map<String, String> USER_KERBEROS_OPTIONS = new HashMap<>();
private static final AppConfigurationEntry USER_KERBEROS_LOGIN;
private static final AppConfigurationEntry[] USER_KERBEROS_CONF;
KerberosConfiguration(String principal, String keytab) {
USER_KERBEROS_OPTIONS.put("principal", principal);
USER_KERBEROS_OPTIONS.put("keyTab", keytab);
}
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
return USER_KERBEROS_CONF;
}
static {
USER_KERBEROS_OPTIONS.put("doNotPrompt", "true");
USER_KERBEROS_OPTIONS.put("useKeyTab", "true");
USER_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
USER_KERBEROS_LOGIN = new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, USER_KERBEROS_OPTIONS);
USER_KERBEROS_CONF = new AppConfigurationEntry[]{USER_KERBEROS_LOGIN};
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hadoop;
import org.apache.http.auth.Credentials;
import javax.security.auth.kerberos.KerberosPrincipal;
import java.security.Principal;
/**
* Crendentials that incorporate a user principal and a keytab file.
*/
public class KerberosKeytabCredentials implements Credentials {
private final KerberosPrincipal userPrincipal;
private final String keytab;
public KerberosKeytabCredentials(String principalName, String keytab) {
this.userPrincipal = new KerberosPrincipal(principalName);
this.keytab = keytab;
}
@Override
public Principal getUserPrincipal() {
return userPrincipal;
}
@Override
public String getPassword() {
return null;
}
public String getKeytab() {
return keytab;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hadoop;
import org.apache.http.auth.AuthScheme;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.protocol.HttpContext;
/**
* Provider class for KerberosKeytabSPNegoAuthScheme.
*/
public class KerberosKeytabSPNegoAuthSchemeProvider implements AuthSchemeProvider {
public AuthScheme create(HttpContext context) {
return new KerberosKeytabSPNegoScheme();
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hadoop;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.http.auth.Credentials;
import org.apache.http.impl.auth.SPNegoScheme;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.Set;
/**
* This class provides a very similar authentication scheme and token generation as {@link SPNegoScheme} does.
* The token generation is based on a keytab file coming from {@link KerberosKeytabCredentials} and the process
* uses hadoop-auth tools.
*/
public class KerberosKeytabSPNegoScheme extends SPNegoScheme {
public KerberosKeytabSPNegoScheme() {
super(true, false);
}
@Override
public byte[] generateToken(byte[] input, String authServer, Credentials credentials) {
Set<Principal> principals = new HashSet<>();
principals.add(credentials.getUserPrincipal());
Subject subject = new Subject(false, principals, new HashSet<>(), new HashSet<>());
try {
LoginContext loginContext = new LoginContext("", subject, null,
new KerberosConfiguration(credentials.getUserPrincipal().getName(),
((KerberosKeytabCredentials) credentials).getKeytab()));
loginContext.login();
Subject loggedInSubject = loginContext.getSubject();
return Subject.doAs(loggedInSubject, new PrivilegedExceptionAction<byte[]>() {
public byte[] run() throws UnknownHostException, ClassNotFoundException, GSSException,
IllegalAccessException, NoSuchFieldException {
GSSManager gssManager = GSSManager.getInstance();
String servicePrincipal = KerberosUtil.getServicePrincipal("HTTP", authServer);
Oid serviceOid = KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL");
GSSName serviceName = gssManager.createName(servicePrincipal, serviceOid);
Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID");
GSSContext gssContext = gssManager.createContext(serviceName, mechOid, null, 0);
gssContext.requestCredDeleg(true);
gssContext.requestMutualAuth(true);
return gssContext.initSecContext(input, 0, input.length);
}
});
} catch (PrivilegedActionException | LoginException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,11 @@
nifi-hadoop-utils
Copyright 2014-2018 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
This includes derived works from the Apache Hadoop (ASLv2 licensed) project (https://github.com/apache/hadoop):
Copyright 2014 The Apache Software Foundation
The derived work is adapted from
org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
and can be found in the org.apache.nifi.hadoop package

View File

@ -28,5 +28,10 @@
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
</dependencies>
</project>

View File

@ -17,18 +17,17 @@
package org.apache.nifi.controller.api.livy;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Map;
import org.apache.http.client.HttpClient;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
public interface LivySessionService extends ControllerService {
String APPLICATION_JSON = "application/json";
String USER = "nifi";
String GET = "GET";
String POST = "POST";
Map<String, String> getSession();
Map<String, String> getSession() throws SessionManagerException;
HttpURLConnection getConnection(String urlString) throws IOException;
HttpClient getConnection() throws IOException, SessionManagerException;
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.api.livy.exception;
public class SessionManagerException extends Exception {
private static final long serialVersionUID = 1L;
public SessionManagerException(final Throwable t) {
super(t);
}
}

View File

@ -60,5 +60,29 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -21,11 +21,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStore;
@ -45,6 +42,22 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -53,8 +66,11 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.hadoop.KerberosKeytabCredentials;
import org.apache.nifi.hadoop.KerberosKeytabSPNegoAuthSchemeProvider;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.codehaus.jackson.map.ObjectMapper;
@ -64,10 +80,8 @@ import org.codehaus.jettison.json.JSONObject;
import org.apache.nifi.controller.api.livy.LivySessionService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
@Tags({"Livy", "REST", "Spark", "http"})
@ -157,6 +171,14 @@ public class LivySessionController extends AbstractControllerService implements
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
private volatile String livyUrl;
private volatile int sessionPoolSize;
private volatile String controllerKind;
@ -168,6 +190,8 @@ public class LivySessionController extends AbstractControllerService implements
private volatile int connectTimeout;
private volatile Thread livySessionManagerThread = null;
private volatile boolean enabled = true;
private volatile KerberosCredentialsService credentialsService;
private volatile SessionManagerException sessionManagerException;
private List<PropertyDescriptor> properties;
@ -183,6 +207,7 @@ public class LivySessionController extends AbstractControllerService implements
props.add(CONNECT_TIMEOUT);
props.add(JARS);
props.add(FILES);
props.add(KERBEROS_CREDENTIALS_SERVICE);
properties = Collections.unmodifiableList(props);
}
@ -204,6 +229,7 @@ public class LivySessionController extends AbstractControllerService implements
sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
connectTimeout = Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
this.livyUrl = "http" + (sslContextService != null ? "s" : "") + "://" + livyHost + ":" + livyPort;
this.controllerKind = sessionKind;
@ -216,12 +242,16 @@ public class LivySessionController extends AbstractControllerService implements
while (enabled) {
try {
manageSessions();
sessionManagerException = null;
} catch (Exception e) {
getLogger().error("Livy Session Manager Thread run into an error, but continues to run", e);
sessionManagerException = new SessionManagerException(e);
}
try {
Thread.sleep(sessionManagerStatusInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
enabled = false;
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
}
});
@ -243,7 +273,9 @@ public class LivySessionController extends AbstractControllerService implements
}
@Override
public Map<String, String> getSession() {
public Map<String, String> getSession() throws SessionManagerException {
checkSessionManagerException();
Map<String, String> sessionMap = new HashMap<>();
try {
final Map<Integer, JSONObject> sessionsCopy = sessions;
@ -254,6 +286,7 @@ public class LivySessionController extends AbstractControllerService implements
if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) {
sessionMap.put("sessionId", String.valueOf(sessionId));
sessionMap.put("livyUrl", livyUrl);
break;
}
}
} catch (JSONException e) {
@ -263,18 +296,41 @@ public class LivySessionController extends AbstractControllerService implements
}
@Override
public HttpURLConnection getConnection(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(connectTimeout);
public HttpClient getConnection() throws IOException, SessionManagerException {
checkSessionManagerException();
return openConnection();
}
private HttpClient openConnection() throws IOException {
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
if (sslContextService != null) {
try {
setSslSocketFactory((HttpsURLConnection) connection, sslContextService, sslContext);
SSLContext sslContext = getSslSocketFactory(sslContextService);
httpClientBuilder.setSSLContext(sslContext);
} catch (KeyStoreException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException e) {
throw new IOException(e);
}
}
return connection;
if (credentialsService != null) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(null, -1, null),
new KerberosKeytabCredentials(credentialsService.getPrincipal(), credentialsService.getKeytab()));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider> create()
.register(AuthSchemes.SPNEGO, new KerberosKeytabSPNegoAuthSchemeProvider()).build();
httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder.setConnectTimeout(connectTimeout);
requestConfigBuilder.setConnectionRequestTimeout(connectTimeout);
requestConfigBuilder.setSocketTimeout(connectTimeout);
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return httpClientBuilder.build();
}
private void manageSessions() throws InterruptedException, IOException {
@ -439,36 +495,34 @@ public class LivySessionController extends AbstractControllerService implements
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException {
HttpURLConnection connection = getConnection(urlString);
connection.setRequestMethod(POST);
connection.setDoOutput(true);
HttpClient httpClient = openConnection();
HttpPost request = new HttpPost(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
request.addHeader(entry.getKey(), entry.getValue());
}
HttpEntity httpEntity = new StringEntity(payload);
request.setEntity(httpEntity);
HttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
}
InputStream content = connection.getInputStream();
InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
private JSONObject readJSONFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
HttpClient httpClient = openConnection();
HttpURLConnection connection = getConnection(urlString);
HttpGet request = new HttpGet(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
request.addHeader(entry.getKey(), entry.getValue());
}
connection.setRequestMethod(GET);
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
HttpResponse response = httpClient.execute(request);
InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
@ -478,7 +532,7 @@ public class LivySessionController extends AbstractControllerService implements
return new JSONObject(jsonText);
}
private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext)
private SSLContext getSslSocketFactory(SSLContextService sslService)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
final String keystoreLocation = sslService.getKeyStoreFile();
final String keystorePass = sslService.getKeyStorePassword();
@ -506,7 +560,14 @@ public class LivySessionController extends AbstractControllerService implements
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
return sslContext;
}
private void checkSessionManagerException() throws SessionManagerException {
SessionManagerException exception = sessionManagerException;
if (exception != null) {
throw sessionManagerException;
}
}
}

View File

@ -86,5 +86,10 @@
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -20,8 +20,6 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -37,10 +35,18 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -162,10 +168,19 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
final ComponentLog log = getLogger();
final LivySessionService livySessionService = context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class);
final Map<String, String> livyController = livySessionService.getSession();
final Map<String, String> livyController;
try {
livyController = livySessionService.getSession();
if (livyController == null || livyController.isEmpty()) {
log.debug("No Spark session available (yet), routing flowfile to wait");
session.transfer(flowFile, REL_WAIT);
context.yield();
return;
}
} catch (SessionManagerException sme) {
log.error("Error opening spark session, routing flowfile to wait", sme);
session.transfer(flowFile, REL_WAIT);
context.yield();
return;
}
final long statusCheckInterval = context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
@ -214,14 +229,15 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
}
} catch (IOException ioe) {
log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
} catch (IOException | SessionManagerException e) {
log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, e.getMessage()}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) throws IOException {
private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval)
throws IOException, SessionManagerException {
ComponentLog log = getLogger();
String statementUrl = livyUrl + "/sessions/" + sessionId + "/statements";
JSONObject output = null;
@ -265,42 +281,42 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
}
private JSONObject readJSONObjectFromUrlPOST(String urlString, LivySessionService livySessionService, Map<String, String> headers, String payload)
throws IOException, JSONException {
HttpURLConnection connection = livySessionService.getConnection(urlString);
connection.setRequestMethod("POST");
connection.setDoOutput(true);
throws IOException, JSONException, SessionManagerException {
HttpClient httpClient = livySessionService.getConnection();
HttpPost request = new HttpPost(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
request.addHeader(entry.getKey(), entry.getValue());
}
HttpEntity httpEntity = new StringEntity(payload);
request.setEntity(httpEntity);
HttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
InputStream content = connection.getInputStream();
private JSONObject readJSONObjectFromUrl(String urlString, LivySessionService livySessionService, Map<String, String> headers) throws IOException, JSONException, SessionManagerException {
HttpClient httpClient = livySessionService.getConnection();
HttpGet request = new HttpGet(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}
HttpResponse response = httpClient.execute(request);
InputStream content = response.getEntity().getContent();
return readAllIntoJSONObject(content);
}
private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
private JSONObject readJSONObjectFromUrl(final String urlString, LivySessionService livySessionService, final Map<String, String> headers)
throws IOException, JSONException {
HttpURLConnection connection = livySessionService.getConnection(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setRequestMethod("GET");
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
}