diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE index f3c8ecef26..d86d9641f1 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE @@ -207,3 +207,51 @@ The Apache NiFi project contains subcomponents with separate copyright notices and license terms. Your use of the source code for the these subcomponents is subject to the terms and conditions of the following licenses. + + The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + The binary distribution of this product bundles 'Slf4j' which is available under + an MIT license. + + Copyright (c) 2004-2013 QOS.ch + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE index 0b2cb0a268..196e2d44a0 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE @@ -45,3 +45,25 @@ The following binary components are provided under the Apache Software License v Apache Commons IO Copyright 2002-2016 The Apache Software Foundation + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java index bc20c01c92..8a512f8fac 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java @@ -58,7 +58,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -194,7 +193,6 @@ public class LivySessionController extends AbstractControllerService implements @OnEnabled public void onConfigured(final ConfigurationContext context) { ComponentLog log = getLogger(); - log.info("********** Starting Livy Session Controller Service..."); final String livyHost = context.getProperty(LIVY_HOST).evaluateAttributeExpressions().getValue(); final String livyPort = context.getProperty(LIVY_PORT).evaluateAttributeExpressions().getValue(); @@ -220,9 +218,7 @@ public class LivySessionController extends AbstractControllerService implements manageSessions(); Thread.sleep(sessionManagerStatusInterval); } catch (InterruptedException e) { - e.printStackTrace(); - log.debug("********** " + Thread.currentThread().getName() - + " run() Interrupt Status: " + Thread.currentThread().isInterrupted()); + Thread.currentThread().interrupt(); enabled = false; } catch (IOException ioe) { throw new ProcessException(ioe); @@ -237,15 +233,16 @@ public class LivySessionController extends AbstractControllerService implements public void shutdown() { ComponentLog log = getLogger(); try { - log.info("********** Starting Livy Session Controller Service..."); enabled = false; livySessionManagerThread.interrupt(); livySessionManagerThread.join(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Livy Session Manager Thread interrupted"); } } + @Override public Map getSession() { Map sessionMap = new HashMap<>(); try { @@ -265,6 +262,7 @@ public class LivySessionController extends AbstractControllerService implements return sessionMap; } + @Override public HttpURLConnection getConnection(String urlString) throws IOException { URL url = new URL(urlString); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); @@ -288,16 +286,16 @@ public class LivySessionController extends AbstractControllerService implements try { sessionsInfo = listSessions(); if (sessions.isEmpty()) { - log.debug("********** manageSessions() the active session list is empty, populating from acquired list..."); + log.debug("manageSessions() the active session list is empty, populating from acquired list..."); sessions.putAll(sessionsInfo); } for (Integer sessionId : new ArrayList<>(sessions.keySet())) { JSONObject currentSession = sessions.get(sessionId); - log.debug("********** manageSessions() Updating current session: " + currentSession); + log.debug("manageSessions() Updating current session: " + currentSession); if (sessionsInfo.containsKey(sessionId)) { String state = currentSession.getString("state"); String sessionKind = currentSession.getString("kind"); - log.debug("********** manageSessions() controller kind: {}, session kind: {}, session state: {}", + log.debug("manageSessions() controller kind: {}, session kind: {}, session state: {}", new Object[]{controllerKind, sessionKind, state}); if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) { // Keep track of how many sessions are in an idle state and thus available @@ -319,36 +317,36 @@ public class LivySessionController extends AbstractControllerService implements } } else { // Prune sessions that no longer exist - log.debug("********** manageSessions() session exists in session pool but not in source snapshot, removing from pool..."); + log.debug("manageSessions() session exists in session pool but not in source snapshot, removing from pool..."); sessions.remove(sessionId); // Remove session from session list source of truth snapshot since it has been dealt with sessionsInfo.remove(sessionId); } } int numSessions = sessions.size(); - log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool"); + log.debug("manageSessions() There are " + numSessions + " sessions in the pool"); // Open new sessions equal to the number requested by sessionPoolSize if (numSessions == 0) { for (int i = 0; i < sessionPoolSize; i++) { newSessionInfo = openSession(); sessions.put(newSessionInfo.getInt("id"), newSessionInfo); - log.debug("********** manageSessions() Registered new session: " + newSessionInfo); + log.debug("manageSessions() Registered new session: " + newSessionInfo); } } else { // Open one new session if there are no idle sessions if (idleSessions == 0) { - log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating..."); + log.debug("manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating..."); newSessionInfo = openSession(); sessions.put(newSessionInfo.getInt("id"), newSessionInfo); - log.debug("********** manageSessions() Registered new session: " + newSessionInfo); + log.debug("manageSessions() Registered new session: " + newSessionInfo); } // Open more sessions if number of sessions is less than target pool size if (numSessions < sessionPoolSize) { - log.debug("********** manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + sessionPoolSize + ", creating..."); + log.debug("manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + sessionPoolSize + ", creating..."); for (int i = 0; i < sessionPoolSize - numSessions; i++) { newSessionInfo = openSession(); sessions.put(newSessionInfo.getInt("id"), newSessionInfo); - log.debug("********** manageSessions() Registered new session: " + newSessionInfo); + log.debug("manageSessions() Registered new session: " + newSessionInfo); } } } @@ -423,7 +421,7 @@ public class LivySessionController extends AbstractControllerService implements } payload.append("}"); - log.debug("********** openSession() Session Payload: " + payload.toString()); + log.debug("openSession() Session Payload: " + payload.toString()); Map headers = new HashMap<>(); headers.put("Content-Type", APPLICATION_JSON); headers.put("X-Requested-By", USER); @@ -431,9 +429,9 @@ public class LivySessionController extends AbstractControllerService implements newSessionInfo = readJSONObjectFromUrlPOST(sessionsUrl, headers, payload.toString()); Thread.sleep(1000); while (newSessionInfo.getString("state").equalsIgnoreCase("starting")) { - log.debug("********** openSession() Waiting for session to start..."); + log.debug("openSession() Waiting for session to start..."); newSessionInfo = getSessionInfo(newSessionInfo.getInt("id")); - log.debug("********** openSession() newSessionInfo: " + newSessionInfo); + log.debug("openSession() newSessionInfo: " + newSessionInfo); Thread.sleep(1000); } @@ -463,28 +461,6 @@ public class LivySessionController extends AbstractControllerService implements return readAllIntoJSONObject(content); } - private JSONArray readJSONArrayFromUrlPOST(String urlString, Map headers, String payload) throws IOException, JSONException { - URL url = new URL(urlString); - HttpURLConnection connection = getConnection(urlString); - connection.setRequestMethod(POST); - connection.setDoOutput(true); - - for (Map.Entry entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); - } - - OutputStream os = connection.getOutputStream(); - os.write(payload.getBytes()); - os.flush(); - - if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) { - throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage()); - } - - InputStream content = connection.getInputStream(); - return readAllIntoJSONArray(content); - } - private JSONObject readJSONFromUrl(String urlString, Map headers) throws IOException, JSONException { HttpURLConnection connection = getConnection(urlString); @@ -497,29 +473,12 @@ public class LivySessionController extends AbstractControllerService implements return readAllIntoJSONObject(content); } - private JSONArray readJSONArrayFromUrl(String urlString, Map headers) throws IOException, JSONException { - HttpURLConnection connection = getConnection(urlString); - for (Map.Entry entry : headers.entrySet()) { - connection.setRequestProperty(entry.getKey(), entry.getValue()); - } - connection.setRequestMethod(GET); - connection.setDoOutput(true); - InputStream content = connection.getInputStream(); - return readAllIntoJSONArray(content); - } - private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException { BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); String jsonText = IOUtils.toString(rd); return new JSONObject(jsonText); } - private JSONArray readAllIntoJSONArray(InputStream content) throws IOException, JSONException { - BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); - String jsonText = IOUtils.toString(rd); - return new JSONArray(jsonText); - } - private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext) throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException { final String keystoreLocation = sslService.getKeyStoreFile(); diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE index f3c8ecef26..d86d9641f1 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE @@ -207,3 +207,51 @@ The Apache NiFi project contains subcomponents with separate copyright notices and license terms. Your use of the source code for the these subcomponents is subject to the terms and conditions of the following licenses. + + The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + The binary distribution of this product bundles 'Slf4j' which is available under + an MIT license. + + Copyright (c) 2004-2013 QOS.ch + All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE index 604541ae33..ed9cd87dcb 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE @@ -53,3 +53,37 @@ The following binary components are provided under the Apache Software License v Apache Commons IO Copyright 2002-2016 The Apache Software Foundation + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Jettison + The following NOTICE information applies: + Copyright 2006 Envoi Solutions LLC + +************************ +Common Development and Distribution License 1.0 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-1 - no url provided) diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java index d388148d54..59deb36edb 100644 --- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java @@ -84,7 +84,8 @@ public class ExecuteSparkInteractive extends AbstractProcessor { .build(); /** - * Points to the charset name corresponding to the incoming flow file's encoding. + * Points to the charset name corresponding to the incoming flow file's + * encoding. */ public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() .name("exec-spark-iactive-charset") @@ -124,7 +125,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor { private volatile List properties; private volatile Set relationships; - + @Override public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); @@ -194,7 +195,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor { String payload = "{\"code\":\"" + code + "\"}"; try { final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); - log.debug("********** ExecuteSparkInteractive Result of Job Submit: " + result); + log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); if (result == null) { session.transfer(flowFile, REL_FAILURE); } else { @@ -228,23 +229,23 @@ public class ExecuteSparkInteractive extends AbstractProcessor { headers.put("X-Requested-By", LivySessionService.USER); headers.put("Accept", "application/json"); - log.debug("********** submitAndHandleJob() Submitting Job to Spark via: " + statementUrl); + log.debug("submitAndHandleJob() Submitting Job to Spark via: " + statementUrl); try { JSONObject jobInfo = readJSONObjectFromUrlPOST(statementUrl, livySessionService, headers, payload); - log.debug("********** submitAndHandleJob() Job Info: " + jobInfo); + log.debug("submitAndHandleJob() Job Info: " + jobInfo); String statementId = String.valueOf(jobInfo.getInt("id")); statementUrl = statementUrl + "/" + statementId; jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers); String jobState = jobInfo.getString("state"); - log.debug("********** submitAndHandleJob() New Job Info: " + jobInfo); + log.debug("submitAndHandleJob() New Job Info: " + jobInfo); Thread.sleep(statusCheckInterval); if (jobState.equalsIgnoreCase("available")) { - log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". returning output..."); + log.debug("submitAndHandleJob() Job status is: " + jobState + ". returning output..."); output = jobInfo.getJSONObject("output"); } else if (jobState.equalsIgnoreCase("running") || jobState.equalsIgnoreCase("waiting")) { while (!jobState.equalsIgnoreCase("available")) { - log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete..."); + log.debug("submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete..."); Thread.sleep(statusCheckInterval); jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers); jobState = jobInfo.getString("state"); @@ -253,7 +254,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor { } else if (jobState.equalsIgnoreCase("error") || jobState.equalsIgnoreCase("cancelled") || jobState.equalsIgnoreCase("cancelling")) { - log.debug("********** Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details."); + log.debug("Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details."); throw new IOException(jobState); } } catch (JSONException | InterruptedException e) { @@ -301,4 +302,4 @@ public class ExecuteSparkInteractive extends AbstractProcessor { String jsonText = IOUtils.toString(rd); return new JSONObject(jsonText); } -} \ No newline at end of file +}