NIFI-4683 This closes #2339. added a few missing license/notice entries for spark controller service and processor nars, removed dead code, cleaned up logging. This sign off is relevant to commit 52684c3ec3b7c0ee28c6cf64cfd178976996615a

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
joewitt 2017-12-14 15:52:43 -05:00
parent 2192138b06
commit a12abc24e5
6 changed files with 180 additions and 68 deletions

View File

@ -207,3 +207,51 @@ The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
The binary distribution of this product bundles 'Slf4j' which is available under
an MIT license.
Copyright (c) 2004-2013 QOS.ch
All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -45,3 +45,25 @@ The following binary components are provided under the Apache Software License v
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

View File

@ -58,7 +58,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@ -194,7 +193,6 @@ public class LivySessionController extends AbstractControllerService implements
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
ComponentLog log = getLogger();
log.info("********** Starting Livy Session Controller Service...");
final String livyHost = context.getProperty(LIVY_HOST).evaluateAttributeExpressions().getValue();
final String livyPort = context.getProperty(LIVY_PORT).evaluateAttributeExpressions().getValue();
@ -220,9 +218,7 @@ public class LivySessionController extends AbstractControllerService implements
manageSessions();
Thread.sleep(sessionManagerStatusInterval);
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("********** " + Thread.currentThread().getName()
+ " run() Interrupt Status: " + Thread.currentThread().isInterrupted());
Thread.currentThread().interrupt();
enabled = false;
} catch (IOException ioe) {
throw new ProcessException(ioe);
@ -237,15 +233,16 @@ public class LivySessionController extends AbstractControllerService implements
public void shutdown() {
ComponentLog log = getLogger();
try {
log.info("********** Starting Livy Session Controller Service...");
enabled = false;
livySessionManagerThread.interrupt();
livySessionManagerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Livy Session Manager Thread interrupted");
}
}
@Override
public Map<String, String> getSession() {
Map<String, String> sessionMap = new HashMap<>();
try {
@ -265,6 +262,7 @@ public class LivySessionController extends AbstractControllerService implements
return sessionMap;
}
@Override
public HttpURLConnection getConnection(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
@ -288,16 +286,16 @@ public class LivySessionController extends AbstractControllerService implements
try {
sessionsInfo = listSessions();
if (sessions.isEmpty()) {
log.debug("********** manageSessions() the active session list is empty, populating from acquired list...");
log.debug("manageSessions() the active session list is empty, populating from acquired list...");
sessions.putAll(sessionsInfo);
}
for (Integer sessionId : new ArrayList<>(sessions.keySet())) {
JSONObject currentSession = sessions.get(sessionId);
log.debug("********** manageSessions() Updating current session: " + currentSession);
log.debug("manageSessions() Updating current session: " + currentSession);
if (sessionsInfo.containsKey(sessionId)) {
String state = currentSession.getString("state");
String sessionKind = currentSession.getString("kind");
log.debug("********** manageSessions() controller kind: {}, session kind: {}, session state: {}",
log.debug("manageSessions() controller kind: {}, session kind: {}, session state: {}",
new Object[]{controllerKind, sessionKind, state});
if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(controllerKind)) {
// Keep track of how many sessions are in an idle state and thus available
@ -319,36 +317,36 @@ public class LivySessionController extends AbstractControllerService implements
}
} else {
// Prune sessions that no longer exist
log.debug("********** manageSessions() session exists in session pool but not in source snapshot, removing from pool...");
log.debug("manageSessions() session exists in session pool but not in source snapshot, removing from pool...");
sessions.remove(sessionId);
// Remove session from session list source of truth snapshot since it has been dealt with
sessionsInfo.remove(sessionId);
}
}
int numSessions = sessions.size();
log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool");
log.debug("manageSessions() There are " + numSessions + " sessions in the pool");
// Open new sessions equal to the number requested by sessionPoolSize
if (numSessions == 0) {
for (int i = 0; i < sessionPoolSize; i++) {
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
log.debug("manageSessions() Registered new session: " + newSessionInfo);
}
} else {
// Open one new session if there are no idle sessions
if (idleSessions == 0) {
log.debug("********** manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating...");
log.debug("manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating...");
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
log.debug("manageSessions() Registered new session: " + newSessionInfo);
}
// Open more sessions if number of sessions is less than target pool size
if (numSessions < sessionPoolSize) {
log.debug("********** manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + sessionPoolSize + ", creating...");
log.debug("manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + sessionPoolSize + ", creating...");
for (int i = 0; i < sessionPoolSize - numSessions; i++) {
newSessionInfo = openSession();
sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
log.debug("********** manageSessions() Registered new session: " + newSessionInfo);
log.debug("manageSessions() Registered new session: " + newSessionInfo);
}
}
}
@ -423,7 +421,7 @@ public class LivySessionController extends AbstractControllerService implements
}
payload.append("}");
log.debug("********** openSession() Session Payload: " + payload.toString());
log.debug("openSession() Session Payload: " + payload.toString());
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", APPLICATION_JSON);
headers.put("X-Requested-By", USER);
@ -431,9 +429,9 @@ public class LivySessionController extends AbstractControllerService implements
newSessionInfo = readJSONObjectFromUrlPOST(sessionsUrl, headers, payload.toString());
Thread.sleep(1000);
while (newSessionInfo.getString("state").equalsIgnoreCase("starting")) {
log.debug("********** openSession() Waiting for session to start...");
log.debug("openSession() Waiting for session to start...");
newSessionInfo = getSessionInfo(newSessionInfo.getInt("id"));
log.debug("********** openSession() newSessionInfo: " + newSessionInfo);
log.debug("openSession() newSessionInfo: " + newSessionInfo);
Thread.sleep(1000);
}
@ -463,28 +461,6 @@ public class LivySessionController extends AbstractControllerService implements
return readAllIntoJSONObject(content);
}
private JSONArray readJSONArrayFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException {
URL url = new URL(urlString);
HttpURLConnection connection = getConnection(urlString);
connection.setRequestMethod(POST);
connection.setDoOutput(true);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
OutputStream os = connection.getOutputStream();
os.write(payload.getBytes());
os.flush();
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage());
}
InputStream content = connection.getInputStream();
return readAllIntoJSONArray(content);
}
private JSONObject readJSONFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
HttpURLConnection connection = getConnection(urlString);
@ -497,29 +473,12 @@ public class LivySessionController extends AbstractControllerService implements
return readAllIntoJSONObject(content);
}
private JSONArray readJSONArrayFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
HttpURLConnection connection = getConnection(urlString);
for (Map.Entry<String, String> entry : headers.entrySet()) {
connection.setRequestProperty(entry.getKey(), entry.getValue());
}
connection.setRequestMethod(GET);
connection.setDoOutput(true);
InputStream content = connection.getInputStream();
return readAllIntoJSONArray(content);
}
private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONObject(jsonText);
}
private JSONArray readAllIntoJSONArray(InputStream content) throws IOException, JSONException {
BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
String jsonText = IOUtils.toString(rd);
return new JSONArray(jsonText);
}
private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, SSLContextService sslService, SSLContext sslContext)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
final String keystoreLocation = sslService.getKeyStoreFile();

View File

@ -207,3 +207,51 @@ The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
The binary distribution of this product bundles 'Slf4j' which is available under
an MIT license.
Copyright (c) 2004-2013 QOS.ch
All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -53,3 +53,37 @@ The following binary components are provided under the Apache Software License v
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
(ASLv2) Jettison
The following NOTICE information applies:
Copyright 2006 Envoi Solutions LLC
************************
Common Development and Distribution License 1.0
************************
The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
(CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-1 - no url provided)

View File

@ -84,7 +84,8 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
.build();
/**
* Points to the charset name corresponding to the incoming flow file's encoding.
* Points to the charset name corresponding to the incoming flow file's
* encoding.
*/
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("exec-spark-iactive-charset")
@ -124,7 +125,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
private volatile List<PropertyDescriptor> properties;
private volatile Set<Relationship> relationships;
@Override
public void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LIVY_CONTROLLER_SERVICE);
@ -194,7 +195,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
String payload = "{\"code\":\"" + code + "\"}";
try {
final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
log.debug("********** ExecuteSparkInteractive Result of Job Submit: " + result);
log.debug("ExecuteSparkInteractive Result of Job Submit: " + result);
if (result == null) {
session.transfer(flowFile, REL_FAILURE);
} else {
@ -228,23 +229,23 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
headers.put("X-Requested-By", LivySessionService.USER);
headers.put("Accept", "application/json");
log.debug("********** submitAndHandleJob() Submitting Job to Spark via: " + statementUrl);
log.debug("submitAndHandleJob() Submitting Job to Spark via: " + statementUrl);
try {
JSONObject jobInfo = readJSONObjectFromUrlPOST(statementUrl, livySessionService, headers, payload);
log.debug("********** submitAndHandleJob() Job Info: " + jobInfo);
log.debug("submitAndHandleJob() Job Info: " + jobInfo);
String statementId = String.valueOf(jobInfo.getInt("id"));
statementUrl = statementUrl + "/" + statementId;
jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers);
String jobState = jobInfo.getString("state");
log.debug("********** submitAndHandleJob() New Job Info: " + jobInfo);
log.debug("submitAndHandleJob() New Job Info: " + jobInfo);
Thread.sleep(statusCheckInterval);
if (jobState.equalsIgnoreCase("available")) {
log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". returning output...");
log.debug("submitAndHandleJob() Job status is: " + jobState + ". returning output...");
output = jobInfo.getJSONObject("output");
} else if (jobState.equalsIgnoreCase("running") || jobState.equalsIgnoreCase("waiting")) {
while (!jobState.equalsIgnoreCase("available")) {
log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete...");
log.debug("submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete...");
Thread.sleep(statusCheckInterval);
jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers);
jobState = jobInfo.getString("state");
@ -253,7 +254,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
} else if (jobState.equalsIgnoreCase("error")
|| jobState.equalsIgnoreCase("cancelled")
|| jobState.equalsIgnoreCase("cancelling")) {
log.debug("********** Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details.");
log.debug("Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details.");
throw new IOException(jobState);
}
} catch (JSONException | InterruptedException e) {