diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 6bd2bb7c24..0bea06a104 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -291,6 +291,11 @@ The following binary components are provided under the Apache Software License v
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+ (ASLv2) Apache Commons Text
+ The following NOTICE information applies:
+ Apache Commons Text
+ Copyright 2001-2018 The Apache Software Foundation
+
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
index ffbe292b79..787d2e492d 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
@@ -53,6 +53,11 @@ The following binary components are provided under the Apache Software License v
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
+ (ASLv2) Apache Commons Text
+ The following NOTICE information applies:
+ Apache Commons Text
+ Copyright 2001-2018 The Apache Software Foundation
+
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
index 70f47f2e01..1854a26c12 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
@@ -98,5 +98,10 @@
1.7.0-SNAPSHOT
test
+
+ org.apache.commons
+ commons-text
+ 1.3
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
index 4a878429fe..d8ca9e10ce 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
@@ -33,7 +33,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.text.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -207,7 +207,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
}
}
- code = StringEscapeUtils.escapeJavaScript(code);
+ code = StringEscapeUtils.escapeJson(code);
String payload = "{\"code\":\"" + code + "\"}";
try {
final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
index 3a2c67a0f9..f0076e7853 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
@@ -16,37 +16,43 @@
*/
package org.apache.nifi.processors.livy;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.web.util.TestServer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.List;
-public class ExecuteSparkInteractiveTestBase {
+class ExecuteSparkInteractiveTestBase {
public static class LivyAPIHandler extends AbstractHandler {
int session1Requests = 0;
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
- response.setStatus(200);
+ int responseStatus = 404;
+ String responseContentType = "text/plain";
+ String responseBody = "Not found";
if ("GET".equalsIgnoreCase(request.getMethod())) {
-
- String responseBody = "{}";
- response.setContentType("application/json");
-
+ responseStatus = 200;
+ responseBody = "{}";
+ responseContentType = "application/json";
if ("/sessions".equalsIgnoreCase(target)) {
responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}";
} else if (target.startsWith("/sessions/") && !target.contains("statement")) {
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
-
} else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) {
switch (session1Requests) {
case 0:
@@ -64,33 +70,55 @@ public class ExecuteSparkInteractiveTestBase {
}
session1Requests++;
}
-
- response.setContentLength(responseBody.length());
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(responseBody);
- writer.flush();
- }
-
} else if ("POST".equalsIgnoreCase(request.getMethod())) {
+ String requestBody = IOUtils.toString(request.getReader());
+ try {
+ // validate JSON payload
+ new ObjectMapper().readTree(requestBody);
- String responseBody = "{}";
- response.setContentType("application/json");
-
- if ("/sessions".equalsIgnoreCase(target)) {
- responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
- } else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
- responseBody = "{\"id\": 7}";
+ responseStatus = 200;
+ responseBody = "{}";
+ responseContentType = "application/json";
+ if ("/sessions".equalsIgnoreCase(target)) {
+ responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
+ } else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
+ responseBody = "{\"id\": 7}";
+ }
+ } catch (JsonProcessingException e) {
+ responseStatus = 400;
+ responseContentType = "text/plain";
+ responseBody = "Bad request";
}
-
- response.setContentLength(responseBody.length());
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(responseBody);
- writer.flush();
- }
-
}
+
+ response.setStatus(responseStatus);
+ response.setContentType(responseContentType);
+ response.setContentLength(responseBody.length());
+
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(responseBody);
+ writer.flush();
+ }
+
}
}
+
+ TestRunner runner;
+
+ void testCode(TestServer server, String code) throws Exception {
+ server.addHandler(new LivyAPIHandler());
+
+ runner.enqueue(code);
+ runner.run();
+ List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+ while (!waitingFlowfiles.isEmpty()) {
+ Thread.sleep(1000);
+ runner.clearTransferState();
+ runner.enqueue(code);
+ runner.run();
+ waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+ }
+ runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
index 1be718ad81..992e2e5c6c 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
@@ -17,26 +17,18 @@
package org.apache.nifi.processors.livy;
import org.apache.nifi.controller.livy.LivySessionController;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
-import org.eclipse.jetty.server.Handler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.util.List;
-
public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase {
- public static TestServer server;
- public static String url;
-
- public TestRunner runner;
+ private static TestServer server;
+ private static String url;
@BeforeClass
public static void beforeClass() throws Exception {
@@ -52,10 +44,6 @@ public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase
url = server.getUrl();
}
- public void addHandler(Handler handler) {
- server.addHandler(handler);
- }
-
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
@@ -79,25 +67,17 @@ public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase
runner.shutdown();
}
- private static TestServer createServer() throws IOException {
+ private static TestServer createServer() {
return new TestServer();
}
@Test
public void testSparkSession() throws Exception {
+ testCode(server, "print \"hello world\"");
+ }
- addHandler(new LivyAPIHandler());
-
- runner.enqueue("print \"hello world\"");
- runner.run();
- List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
- while (!waitingFlowfiles.isEmpty()) {
- Thread.sleep(1000);
- runner.clearTransferState();
- runner.enqueue("print \"hello world\"");
- runner.run();
- waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
- }
- runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
+ @Test
+ public void testSparkSessionWithSpecialChars() throws Exception {
+ testCode(server, "print \"/'?!<>[]{}()$&*=%;.|_-\\\"");
}
}
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
index 3e84cba64d..c5cec742ad 100644
--- a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
@@ -18,30 +18,23 @@ package org.apache.nifi.processors.livy;
import org.apache.nifi.controller.livy.LivySessionController;
import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
-import org.eclipse.jetty.server.Handler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase {
private static Map sslProperties;
- public static TestServer server;
- public static String url;
-
- public TestRunner runner;
+ private static TestServer server;
+ private static String url;
@BeforeClass
public static void beforeClass() throws Exception {
@@ -64,10 +57,6 @@ public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestB
url = server.getSecureUrl();
}
- public void addHandler(Handler handler) {
- server.addHandler(handler);
- }
-
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
@@ -101,25 +90,13 @@ public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestB
runner.shutdown();
}
- private static TestServer createServer() throws IOException {
+ private static TestServer createServer() {
return new TestServer(sslProperties);
}
@Test
- public void testSslSparkSession() throws Exception {
- addHandler(new LivyAPIHandler());
-
- runner.enqueue("print \"hello world\"");
- runner.run();
- List waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
- while (!waitingFlowfiles.isEmpty()) {
- Thread.sleep(1000);
- runner.clearTransferState();
- runner.enqueue("print \"hello world\"");
- runner.run();
- waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
- }
- runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
+ public void testSparkSession() throws Exception {
+ testCode(server,"print \"hello world\"");
}
private static Map createSslProperties() {