mirror of https://github.com/apache/nifi.git
NIFI-5278: fixes JSON escaping of code
Change-Id: I2cb0e6c658d4a0f2aad9c4aab9201a3334ee54df NIFI-5278: adds Apache Commons Text to NOTICE Change-Id: I8185239b0a888c16159b18f13d6682ba350cc766 NIFI-5278: adds tests Change-Id: I9286ac71bc7399e5bdc1e6602609b5e8829db27e NIFI-5278: fixes review findings Change-Id: I292c93dae877cf1cd146f3897b7e132b6afac801 Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2768
This commit is contained in:
parent
a64680bfa5
commit
241bc2aa89
|
@ -291,6 +291,11 @@ The following binary components are provided under the Apache Software License v
|
||||||
This product includes software from the Spring Framework,
|
This product includes software from the Spring Framework,
|
||||||
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
||||||
|
|
||||||
|
(ASLv2) Apache Commons Text
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Apache Commons Text
|
||||||
|
Copyright 2001-2018 The Apache Software Foundation
|
||||||
|
|
||||||
(ASLv2) Apache Commons Configuration
|
(ASLv2) Apache Commons Configuration
|
||||||
The following NOTICE information applies:
|
The following NOTICE information applies:
|
||||||
Apache Commons Configuration
|
Apache Commons Configuration
|
||||||
|
|
|
@ -53,6 +53,11 @@ The following binary components are provided under the Apache Software License v
|
||||||
Apache Commons IO
|
Apache Commons IO
|
||||||
Copyright 2002-2016 The Apache Software Foundation
|
Copyright 2002-2016 The Apache Software Foundation
|
||||||
|
|
||||||
|
(ASLv2) Apache Commons Text
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Apache Commons Text
|
||||||
|
Copyright 2001-2018 The Apache Software Foundation
|
||||||
|
|
||||||
(ASLv2) Jackson JSON processor
|
(ASLv2) Jackson JSON processor
|
||||||
The following NOTICE information applies:
|
The following NOTICE information applies:
|
||||||
# Jackson JSON processor
|
# Jackson JSON processor
|
||||||
|
|
|
@ -98,5 +98,10 @@
|
||||||
<version>1.7.0-SNAPSHOT</version>
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-text</artifactId>
|
||||||
|
<version>1.3</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
|
@ -33,7 +33,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang.StringEscapeUtils;
|
import org.apache.commons.text.StringEscapeUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
|
@ -207,7 +207,7 @@ public class ExecuteSparkInteractive extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = StringEscapeUtils.escapeJavaScript(code);
|
code = StringEscapeUtils.escapeJson(code);
|
||||||
String payload = "{\"code\":\"" + code + "\"}";
|
String payload = "{\"code\":\"" + code + "\"}";
|
||||||
try {
|
try {
|
||||||
final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
|
final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval);
|
||||||
|
|
|
@ -16,37 +16,43 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.livy;
|
package org.apache.nifi.processors.livy;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.nifi.web.util.TestServer;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.eclipse.jetty.server.Request;
|
import org.eclipse.jetty.server.Request;
|
||||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class ExecuteSparkInteractiveTestBase {
|
class ExecuteSparkInteractiveTestBase {
|
||||||
|
|
||||||
public static class LivyAPIHandler extends AbstractHandler {
|
public static class LivyAPIHandler extends AbstractHandler {
|
||||||
|
|
||||||
int session1Requests = 0;
|
int session1Requests = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
|
||||||
baseRequest.setHandled(true);
|
baseRequest.setHandled(true);
|
||||||
|
|
||||||
response.setStatus(200);
|
int responseStatus = 404;
|
||||||
|
String responseContentType = "text/plain";
|
||||||
|
String responseBody = "Not found";
|
||||||
|
|
||||||
if ("GET".equalsIgnoreCase(request.getMethod())) {
|
if ("GET".equalsIgnoreCase(request.getMethod())) {
|
||||||
|
responseStatus = 200;
|
||||||
String responseBody = "{}";
|
responseBody = "{}";
|
||||||
response.setContentType("application/json");
|
responseContentType = "application/json";
|
||||||
|
|
||||||
if ("/sessions".equalsIgnoreCase(target)) {
|
if ("/sessions".equalsIgnoreCase(target)) {
|
||||||
responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}";
|
responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}";
|
||||||
} else if (target.startsWith("/sessions/") && !target.contains("statement")) {
|
} else if (target.startsWith("/sessions/") && !target.contains("statement")) {
|
||||||
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
|
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
|
||||||
|
|
||||||
} else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) {
|
} else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) {
|
||||||
switch (session1Requests) {
|
switch (session1Requests) {
|
||||||
case 0:
|
case 0:
|
||||||
|
@ -64,33 +70,55 @@ public class ExecuteSparkInteractiveTestBase {
|
||||||
}
|
}
|
||||||
session1Requests++;
|
session1Requests++;
|
||||||
}
|
}
|
||||||
|
|
||||||
response.setContentLength(responseBody.length());
|
|
||||||
|
|
||||||
try (PrintWriter writer = response.getWriter()) {
|
|
||||||
writer.print(responseBody);
|
|
||||||
writer.flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if ("POST".equalsIgnoreCase(request.getMethod())) {
|
} else if ("POST".equalsIgnoreCase(request.getMethod())) {
|
||||||
|
String requestBody = IOUtils.toString(request.getReader());
|
||||||
|
try {
|
||||||
|
// validate JSON payload
|
||||||
|
new ObjectMapper().readTree(requestBody);
|
||||||
|
|
||||||
String responseBody = "{}";
|
responseStatus = 200;
|
||||||
response.setContentType("application/json");
|
responseBody = "{}";
|
||||||
|
responseContentType = "application/json";
|
||||||
if ("/sessions".equalsIgnoreCase(target)) {
|
if ("/sessions".equalsIgnoreCase(target)) {
|
||||||
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
|
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
|
||||||
} else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
|
} else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
|
||||||
responseBody = "{\"id\": 7}";
|
responseBody = "{\"id\": 7}";
|
||||||
|
}
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
responseStatus = 400;
|
||||||
|
responseContentType = "text/plain";
|
||||||
|
responseBody = "Bad request";
|
||||||
}
|
}
|
||||||
|
|
||||||
response.setContentLength(responseBody.length());
|
|
||||||
|
|
||||||
try (PrintWriter writer = response.getWriter()) {
|
|
||||||
writer.print(responseBody);
|
|
||||||
writer.flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
response.setStatus(responseStatus);
|
||||||
|
response.setContentType(responseContentType);
|
||||||
|
response.setContentLength(responseBody.length());
|
||||||
|
|
||||||
|
try (PrintWriter writer = response.getWriter()) {
|
||||||
|
writer.print(responseBody);
|
||||||
|
writer.flush();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TestRunner runner;
|
||||||
|
|
||||||
|
void testCode(TestServer server, String code) throws Exception {
|
||||||
|
server.addHandler(new LivyAPIHandler());
|
||||||
|
|
||||||
|
runner.enqueue(code);
|
||||||
|
runner.run();
|
||||||
|
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
||||||
|
while (!waitingFlowfiles.isEmpty()) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.enqueue(code);
|
||||||
|
runner.run();
|
||||||
|
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
||||||
|
}
|
||||||
|
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,26 +17,18 @@
|
||||||
package org.apache.nifi.processors.livy;
|
package org.apache.nifi.processors.livy;
|
||||||
|
|
||||||
import org.apache.nifi.controller.livy.LivySessionController;
|
import org.apache.nifi.controller.livy.LivySessionController;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.apache.nifi.web.util.TestServer;
|
import org.apache.nifi.web.util.TestServer;
|
||||||
import org.eclipse.jetty.server.Handler;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase {
|
public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase {
|
||||||
|
|
||||||
public static TestServer server;
|
private static TestServer server;
|
||||||
public static String url;
|
private static String url;
|
||||||
|
|
||||||
public TestRunner runner;
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
|
@ -52,10 +44,6 @@ public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase
|
||||||
url = server.getUrl();
|
url = server.getUrl();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addHandler(Handler handler) {
|
|
||||||
server.addHandler(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
server.shutdownServer();
|
server.shutdownServer();
|
||||||
|
@ -79,25 +67,17 @@ public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase
|
||||||
runner.shutdown();
|
runner.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TestServer createServer() throws IOException {
|
private static TestServer createServer() {
|
||||||
return new TestServer();
|
return new TestServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSparkSession() throws Exception {
|
public void testSparkSession() throws Exception {
|
||||||
|
testCode(server, "print \"hello world\"");
|
||||||
|
}
|
||||||
|
|
||||||
addHandler(new LivyAPIHandler());
|
@Test
|
||||||
|
public void testSparkSessionWithSpecialChars() throws Exception {
|
||||||
runner.enqueue("print \"hello world\"");
|
testCode(server, "print \"/'?!<>[]{}()$&*=%;.|_-\\\"");
|
||||||
runner.run();
|
|
||||||
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
|
||||||
while (!waitingFlowfiles.isEmpty()) {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
runner.clearTransferState();
|
|
||||||
runner.enqueue("print \"hello world\"");
|
|
||||||
runner.run();
|
|
||||||
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
|
||||||
}
|
|
||||||
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,30 +18,23 @@ package org.apache.nifi.processors.livy;
|
||||||
|
|
||||||
import org.apache.nifi.controller.livy.LivySessionController;
|
import org.apache.nifi.controller.livy.LivySessionController;
|
||||||
import org.apache.nifi.ssl.StandardSSLContextService;
|
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.apache.nifi.web.util.TestServer;
|
import org.apache.nifi.web.util.TestServer;
|
||||||
import org.eclipse.jetty.server.Handler;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase {
|
public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase {
|
||||||
|
|
||||||
private static Map<String, String> sslProperties;
|
private static Map<String, String> sslProperties;
|
||||||
|
|
||||||
public static TestServer server;
|
private static TestServer server;
|
||||||
public static String url;
|
private static String url;
|
||||||
|
|
||||||
public TestRunner runner;
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
|
@ -64,10 +57,6 @@ public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestB
|
||||||
url = server.getSecureUrl();
|
url = server.getSecureUrl();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addHandler(Handler handler) {
|
|
||||||
server.addHandler(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
server.shutdownServer();
|
server.shutdownServer();
|
||||||
|
@ -101,25 +90,13 @@ public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestB
|
||||||
runner.shutdown();
|
runner.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TestServer createServer() throws IOException {
|
private static TestServer createServer() {
|
||||||
return new TestServer(sslProperties);
|
return new TestServer(sslProperties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSslSparkSession() throws Exception {
|
public void testSparkSession() throws Exception {
|
||||||
addHandler(new LivyAPIHandler());
|
testCode(server,"print \"hello world\"");
|
||||||
|
|
||||||
runner.enqueue("print \"hello world\"");
|
|
||||||
runner.run();
|
|
||||||
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
|
||||||
while (!waitingFlowfiles.isEmpty()) {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
runner.clearTransferState();
|
|
||||||
runner.enqueue("print \"hello world\"");
|
|
||||||
runner.run();
|
|
||||||
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
|
|
||||||
}
|
|
||||||
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, String> createSslProperties() {
|
private static Map<String, String> createSslProperties() {
|
||||||
|
|
Loading…
Reference in New Issue