NIFI-9267 Replaced nifi-standard-web-test-utils with mockwebserver

- Replaced instances of custom Jetty TestServer with OkHttp MockWebServer
- Removed integration tests referencing nifi-standard-web-test-utils SslContextUtils
- Removed nifi-standard-web-test-utils

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5620.
This commit is contained in:
exceptionfactory 2021-12-21 18:25:09 -06:00 committed by Joe Gresock
parent 5832dff25e
commit 6039095625
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
30 changed files with 449 additions and 3158 deletions

View File

@ -79,14 +79,9 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId> <artifactId>mockwebserver</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -1,95 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3
import com.amazonaws.services.s3.model.ObjectMetadata
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@RunWith(JUnit4.class)
class PutS3ObjectTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(PutS3ObjectTest.class);
private static long mockFlowFileId = 0
private PutS3Object putS3Object
@BeforeClass
static void setUpOnce() {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() {
super.setUp()
putS3Object = new PutS3Object()
}
@After
void tearDown() {
}
@Test
void testShouldIncludeServerSideEncryptionAlgorithmProperty() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.name == "server-side-encryption"
assert ssePropertyDescriptor.displayName == "Server Side Encryption"
}
@Test
void testShouldValidateServerSideEncryptionDefaultsToNone() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.defaultValue == putS3Object.NO_SERVER_SIDE_ENCRYPTION
}
@Test
void testShouldValidateServerSideEncryptionAllowableValues() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.allowableValues*.toString() == [putS3Object.NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION]
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.wag;
import java.util.Optional;
import java.util.function.Predicate;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
public class RequestMatcher<T> extends BaseMatcher<T> {
private final Predicate<T> matcher;
private final Optional<String> description;
public RequestMatcher(Predicate<T> matcher) {
this(matcher, null);
}
public RequestMatcher(Predicate<T> matcher, String description) {
this.matcher = matcher;
this.description = Optional.ofNullable(description);
}
@SuppressWarnings("unchecked")
@Override
public boolean matches(Object argument) {
return matcher.test((T) argument);
}
@Override
public void describeTo(Description description) {
this.description.ifPresent(description::appendText);
}
}

View File

@ -16,43 +16,20 @@
*/ */
package org.apache.nifi.processors.aws.wag; package org.apache.nifi.processors.aws.wag;
import java.io.IOException; import okhttp3.mockwebserver.MockWebServer;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.Test;
public class TestInvokeInvokeAmazonGatewayApi extends TestInvokeAWSGatewayApiCommon {
public class TestInvokeInvokeAmazonGatewayApiWithCredFile extends
TestInvokeAWSGatewayApiCommon {
@BeforeClass
public static void beforeClass() throws Exception {
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port
server = createServer();
server.startServer();
// this is the base url with the random port
url = server.getUrl();
}
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
}
@Before @Before
public void before() throws Exception { public void before() throws Exception {
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class); runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
server.clearHandlers(); setupControllerService();
setupCredFile(); mockWebServer = new MockWebServer();
} }
@After @After
@ -60,7 +37,17 @@ public class TestInvokeInvokeAmazonGatewayApiWithCredFile extends
runner.shutdown(); runner.shutdown();
} }
private static TestServer createServer() throws IOException { @Test
return new TestServer(); public void testStaticCredentials() throws Exception {
runner.clearProperties();
setupAuth();
test200();
}
@Test
public void testCredentialsFile() throws Exception {
runner.clearProperties();
setupCredFile();
test200();
} }
} }

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.wag;
import java.io.IOException;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public class TestInvokeInvokeAmazonGatewayApiWithControllerService extends
TestInvokeAWSGatewayApiCommon {
@BeforeClass
public static void beforeClass() throws Exception {
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port
server = createServer();
server.startServer();
// this is the base url with the random port
url = server.getUrl();
}
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
}
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
runner.setValidateExpressionUsage(false);
server.clearHandlers();
setupControllerService();
}
@After
public void after() {
runner.shutdown();
}
private static TestServer createServer() throws IOException {
return new TestServer();
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.wag;
import java.io.IOException;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public class TestInvokeInvokeAmazonGatewayApiWithStaticAuth extends
TestInvokeAWSGatewayApiCommon {
@BeforeClass
public static void beforeClass() throws Exception {
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port
server = createServer();
server.startServer();
// this is the base url with the random port
url = server.getUrl();
}
@AfterClass
public static void afterClass() throws Exception {
server.shutdownServer();
}
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
runner.setValidateExpressionUsage(false);
server.clearHandlers();
setupAuth();
}
@After
public void after() {
runner.shutdown();
}
private static TestServer createServer() throws IOException {
return new TestServer();
}
}

View File

@ -169,12 +169,6 @@
</dependency> </dependency>
<!-- test dependencies --> <!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId> <artifactId>nifi-security-utils</artifactId>

View File

@ -34,7 +34,6 @@ import org.apache.nifi.ssl.SSLContextService
import org.apache.nifi.util.StringUtils import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners import org.apache.nifi.util.TestRunners
import org.apache.nifi.web.util.ssl.SslContextUtils
import org.junit.After import org.junit.After
import org.junit.Assert import org.junit.Assert
import org.junit.Assume import org.junit.Assume
@ -502,26 +501,6 @@ class ElasticSearchClientService_IT {
Assert.assertTrue(ee.isNotFound()) Assert.assertTrue(ee.isNotFound())
} }
@Test
void testSSL() {
final String serviceIdentifier = SSLContextService.class.getName()
final SSLContextService sslContext = mock(SSLContextService.class)
when(sslContext.getIdentifier()).thenReturn(serviceIdentifier)
final SSLContext clientSslContext = SslContextUtils.createSslContext(truststoreTlsConfiguration)
when(sslContext.createContext()).thenReturn(clientSslContext)
when(sslContext.createTlsConfiguration()).thenReturn(truststoreTlsConfiguration)
runner.addControllerService(serviceIdentifier, sslContext)
runner.enableControllerService(sslContext)
runner.disableControllerService(service)
runner.setProperty(service, ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier)
runner.enableControllerService(service)
runner.assertValid(service)
}
@Test @Test
void testNullSuppression() { void testNullSuppression() {
Map<String, Object> doc = new HashMap<String, Object>(){{ Map<String, Object> doc = new HashMap<String, Object>(){{

View File

@ -65,13 +65,9 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.eclipse.jetty</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>jetty-server</artifactId> <artifactId>mockwebserver</artifactId>
<scope>test</scope> <version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -90,11 +86,5 @@
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.file.FileUtils;
public class CaptureServlet extends HttpServlet {
private static final long serialVersionUID = 8402271018449653919L;
private volatile byte[] lastPost;
private volatile Map<String, String> lastPostHeaders;
public byte[] getLastPost() {
return lastPost;
}
public Map<String, String> getLastPostHeaders() {
return lastPostHeaders;
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
}

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.nifi.stream.io.StreamUtils;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
public class PostSlackCaptureServlet extends HttpServlet {
static final String REQUEST_PATH_SUCCESS_TEXT_MSG = "/success/text_msg";
static final String REQUEST_PATH_SUCCESS_FILE_MSG = "/success/file_msg";
static final String REQUEST_PATH_WARNING = "/warning";
static final String REQUEST_PATH_ERROR = "/error";
static final String REQUEST_PATH_EMPTY_JSON = "/empty-json";
static final String REQUEST_PATH_INVALID_JSON = "/invalid-json";
private static final String RESPONSE_SUCCESS_TEXT_MSG = "{\"ok\": true}";
private static final String RESPONSE_SUCCESS_FILE_MSG = "{\"ok\": true, \"file\": {\"url_private\": \"slack-file-url\"}}";
private static final String RESPONSE_WARNING = "{\"ok\": true, \"warning\": \"slack-warning\"}";
private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": \"slack-error\"}";
private static final String RESPONSE_EMPTY_JSON = "{}";
private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
private static final Map<String, String> RESPONSE_MAP;
static {
RESPONSE_MAP = new HashMap<>();
RESPONSE_MAP.put(REQUEST_PATH_SUCCESS_TEXT_MSG, RESPONSE_SUCCESS_TEXT_MSG);
RESPONSE_MAP.put(REQUEST_PATH_SUCCESS_FILE_MSG, RESPONSE_SUCCESS_FILE_MSG);
RESPONSE_MAP.put(REQUEST_PATH_WARNING, RESPONSE_WARNING);
RESPONSE_MAP.put(REQUEST_PATH_ERROR, RESPONSE_ERROR);
RESPONSE_MAP.put(REQUEST_PATH_EMPTY_JSON, RESPONSE_EMPTY_JSON);
RESPONSE_MAP.put(REQUEST_PATH_INVALID_JSON, RESPONSE_INVALID_JSON);
}
private volatile boolean interacted;
private volatile Map<String, String> lastPostHeaders;
private volatile byte[] lastPostBody;
public Map<String, String> getLastPostHeaders() {
return lastPostHeaders;
}
public byte[] getLastPostBody() {
return lastPostBody;
}
public boolean hasBeenInteracted() {
return interacted;
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
interacted = true;
Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
lastPostHeaders.put(headerName, request.getHeader(headerName));
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
StreamUtils.copy(request.getInputStream(), baos);
lastPostBody = baos.toByteArray();
String responseJson = RESPONSE_MAP.get(request.getPathInfo());
if (responseJson != null) {
response.setContentType(ContentType.APPLICATION_JSON.toString());
PrintWriter out = response.getWriter();
out.print(responseJson);
out.close();
} else {
response.setStatus(HttpStatus.SC_BAD_REQUEST);
}
}
}

View File

@ -16,52 +16,47 @@
*/ */
package org.apache.nifi.processors.slack; package org.apache.nifi.processors.slack;
import okhttp3.Headers;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.nio.charset.Charset; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_SUCCESS_FILE_MSG;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class PostSlackFileMessageTest { public class PostSlackFileMessageTest {
private static final String RESPONSE_SUCCESS_FILE_MSG = "{\"ok\": true, \"file\": {\"url_private\": \"slack-file-url\"}}";
private TestRunner testRunner; private TestRunner testRunner;
private TestServer server; private MockWebServer mockWebServer;
private PostSlackCaptureServlet servlet;
private String url;
@BeforeEach @BeforeEach
public void setup() throws Exception { public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PostSlack.class); testRunner = TestRunners.newTestRunner(PostSlack.class);
servlet = new PostSlackCaptureServlet();
ServletContextHandler handler = new ServletContextHandler();
handler.addServlet(new ServletHolder(servlet), "/*");
server = new TestServer();
server.addHandler(handler);
server.startServer();
} }
@Test @Test
public void sendMessageWithBasicPropertiesSuccessfully() { public void sendMessageWithBasicPropertiesSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
@ -70,6 +65,8 @@ public class PostSlackFileMessageTest {
flowFileAttributes.put(CoreAttributes.FILENAME.key(), "my-file-name"); flowFileAttributes.put(CoreAttributes.FILENAME.key(), "my-file-name");
flowFileAttributes.put(CoreAttributes.MIME_TYPE.key(), "image/png"); flowFileAttributes.put(CoreAttributes.MIME_TYPE.key(), "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
// in order not to make the assertion logic (even more) complicated, the file content is tested with character data instead of binary data // in order not to make the assertion logic (even more) complicated, the file content is tested with character data instead of binary data
testRunner.enqueue("my-data", flowFileAttributes); testRunner.enqueue("my-data", flowFileAttributes);
testRunner.run(1); testRunner.run(1);
@ -83,8 +80,8 @@ public class PostSlackFileMessageTest {
} }
@Test @Test
public void sendMessageWithAllPropertiesSuccessfully() { public void sendMessageWithAllPropertiesSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
@ -93,6 +90,8 @@ public class PostSlackFileMessageTest {
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name"); testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png"); testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data"); testRunner.enqueue("my-data");
testRunner.run(1); testRunner.run(1);
@ -106,28 +105,30 @@ public class PostSlackFileMessageTest {
@Test @Test
public void processShouldFailWhenChannelIsEmpty() { public void processShouldFailWhenChannelIsEmpty() {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl()); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "${dummy}"); testRunner.setProperty(PostSlack.CHANNEL, "${dummy}");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{}"));
testRunner.enqueue("my-data"); testRunner.enqueue("my-data");
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
assertFalse(servlet.hasBeenInteracted());
} }
@Test @Test
public void fileNameShouldHaveFallbackValueWhenEmpty() { public void fileNameShouldHaveFallbackValueWhenEmpty() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "${dummy}"); testRunner.setProperty(PostSlack.FILE_NAME, "${dummy}");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png"); testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data"); testRunner.enqueue("my-data");
testRunner.run(1); testRunner.run(1);
@ -141,14 +142,16 @@ public class PostSlackFileMessageTest {
} }
@Test @Test
public void mimeTypeShouldHaveFallbackValueWhenEmpty() { public void mimeTypeShouldHaveFallbackValueWhenEmpty() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name"); testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "${dummy}"); testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "${dummy}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data"); testRunner.enqueue("my-data");
testRunner.run(1); testRunner.run(1);
@ -162,14 +165,16 @@ public class PostSlackFileMessageTest {
} }
@Test @Test
public void mimeTypeShouldHaveFallbackValueWhenInvalid() { public void mimeTypeShouldHaveFallbackValueWhenInvalid() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name"); testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "invalid"); testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "invalid");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data"); testRunner.enqueue("my-data");
testRunner.run(1); testRunner.run(1);
@ -183,20 +188,27 @@ public class PostSlackFileMessageTest {
} }
@Test @Test
public void sendInternationalMessageSuccessfully() { public void sendInternationalMessageSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, server.getUrl() + REQUEST_PATH_SUCCESS_FILE_MSG); testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn"); testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES); testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_TITLE, "Iñtërnâtiônàližætiøn"); testRunner.setProperty(PostSlack.FILE_TITLE, "Iñtërnâtiônàližætiøn");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
Map<String, String> parts = parsePostBodyParts(parseMultipartBoundary(servlet.getLastPostHeaders().get("Content-Type"))); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String body = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
final Headers headers = recordedRequest.getHeaders();
Map<String, String> parts = parsePostBodyParts(parseMultipartBoundary(headers.get("Content-Type")), body);
assertEquals("Iñtërnâtiônàližætiøn", parts.get("initial_comment")); assertEquals("Iñtërnâtiônàližætiøn", parts.get("initial_comment"));
assertEquals("Iñtërnâtiônàližætiøn", parts.get("title")); assertEquals("Iñtërnâtiônàližætiøn", parts.get("title"));
@ -204,17 +216,20 @@ public class PostSlackFileMessageTest {
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url")); assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
} }
private void assertRequest(String fileName, String mimeType, String text, String title) { private void assertRequest(String fileName, String mimeType, String text, String title) throws InterruptedException {
Map<String, String> requestHeaders = servlet.getLastPostHeaders(); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
assertEquals("Bearer my-access-token", requestHeaders.get("Authorization")); final String body = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
String contentType = requestHeaders.get("Content-Type"); final Headers headers = recordedRequest.getHeaders();
assertEquals("Bearer my-access-token", headers.get("Authorization"));
String contentType = headers.get("Content-Type");
assertTrue(contentType.startsWith("multipart/form-data")); assertTrue(contentType.startsWith("multipart/form-data"));
String boundary = parseMultipartBoundary(contentType); String boundary = parseMultipartBoundary(contentType);
assertNotNull(boundary, "Multipart boundary not found in Content-Type header: " + contentType); assertNotNull(boundary, "Multipart boundary not found in Content-Type header: " + contentType);
Map<String, String> parts = parsePostBodyParts(boundary); Map<String, String> parts = parsePostBodyParts(boundary, body);
assertNotNull(parts.get("channels"), "'channels' parameter not found in the POST request body"); assertNotNull(parts.get("channels"), "'channels' parameter not found in the POST request body");
assertEquals("my-channel", parts.get("channels"), "'channels' parameter has wrong value"); assertEquals("my-channel", parts.get("channels"), "'channels' parameter has wrong value");
@ -234,7 +249,8 @@ public class PostSlackFileMessageTest {
assertNotNull(parts.get("file"), "The file part not found in the POST request body"); assertNotNull(parts.get("file"), "The file part not found in the POST request body");
Map<String, String> fileParameters = parseFilePart(boundary);
Map<String, String> fileParameters = parseFilePart(boundary, body);
assertEquals("my-data", fileParameters.get("data"), "File data is wrong in the POST request body"); assertEquals("my-data", fileParameters.get("data"), "File data is wrong in the POST request body");
assertEquals(fileName, fileParameters.get("filename"), "'filename' attribute of the file part has wrong value"); assertEquals(fileName, fileParameters.get("filename"), "'filename' attribute of the file part has wrong value");
assertEquals(mimeType, fileParameters.get("contentType"), "Content-Type of the file part is wrong"); assertEquals(mimeType, fileParameters.get("contentType"), "Content-Type of the file part is wrong");
@ -253,11 +269,11 @@ public class PostSlackFileMessageTest {
return boundary; return boundary;
} }
private Map<String, String> parsePostBodyParts(String boundary) { private Map<String, String> parsePostBodyParts(String boundary, String body) {
Pattern partNamePattern = Pattern.compile("name=\"(.*?)\""); Pattern partNamePattern = Pattern.compile("name=\"(.*?)\"");
Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$"); Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$");
String[] postBodyParts = new String(servlet.getLastPostBody(), Charset.forName("UTF-8")).split(boundary); String[] postBodyParts = body.split(boundary);
Map<String, String> parts = new HashMap<>(); Map<String, String> parts = new HashMap<>();
@ -276,13 +292,13 @@ public class PostSlackFileMessageTest {
return parts; return parts;
} }
private Map<String, String> parseFilePart(String boundary) { private Map<String, String> parseFilePart(String boundary, String body) {
Pattern partNamePattern = Pattern.compile("name=\"file\""); Pattern partNamePattern = Pattern.compile("name=\"file\"");
Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$"); Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$");
Pattern partFilenamePattern = Pattern.compile("filename=\"(.*?)\""); Pattern partFilenamePattern = Pattern.compile("filename=\"(.*?)\"");
Pattern partContentTypePattern = Pattern.compile("Content-Type: (.*?)\r\n"); Pattern partContentTypePattern = Pattern.compile("Content-Type: (.*?)\r\n");
String[] postBodyParts = new String(servlet.getLastPostBody(), Charset.forName("UTF-8")).split(boundary); String[] postBodyParts = body.split(boundary);
Map<String, String> fileParameters = new HashMap<>(); Map<String, String> fileParameters = new HashMap<>();

View File

@ -16,57 +16,51 @@
*/ */
package org.apache.nifi.processors.slack; package org.apache.nifi.processors.slack;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.json.Json; import javax.json.Json;
import javax.json.JsonObject; import javax.json.JsonObject;
import java.io.ByteArrayInputStream; import javax.json.JsonReader;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Map;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_EMPTY_JSON;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_ERROR;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_INVALID_JSON;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_SUCCESS_TEXT_MSG;
import static org.apache.nifi.processors.slack.PostSlackCaptureServlet.REQUEST_PATH_WARNING;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class PostSlackTextMessageTest { public class PostSlackTextMessageTest {
private static final String RESPONSE_SUCCESS_TEXT_MSG = "{\"ok\": true}";
private static final String RESPONSE_SUCCESS_FILE_MSG = "{\"ok\": true, \"file\": {\"url_private\": \"slack-file-url\"}}";
private static final String RESPONSE_WARNING = "{\"ok\": true, \"warning\": \"slack-warning\"}";
private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": \"slack-error\"}";
private static final String RESPONSE_EMPTY_JSON = "{}";
private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
private TestRunner testRunner; private TestRunner testRunner;
private TestServer server; private MockWebServer mockWebServer;
private PostSlackCaptureServlet servlet;
private String url;
@BeforeEach @BeforeEach
public void setup() throws Exception { public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PostSlack.class); testRunner = TestRunners.newTestRunner(PostSlack.class);
servlet = new PostSlackCaptureServlet();
ServletContextHandler handler = new ServletContextHandler();
handler.addServlet(new ServletHolder(servlet), "/*");
server = new TestServer();
server.addHandler(handler);
server.startServer();
} }
@Test @Test
public void sendTextOnlyMessageSuccessfully() { public void sendTextOnlyMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -79,12 +73,14 @@ public class PostSlackTextMessageTest {
@Test @Test
public void sendTextWithAttachmentMessageSuccessfully() { public void sendTextWithAttachmentMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}"); testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -98,11 +94,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void sendAttachmentOnlyMessageSuccessfully() { public void sendAttachmentOnlyMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}"); testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -115,7 +113,7 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldFailWhenChannelIsEmpty() { public void processShouldFailWhenChannelIsEmpty() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl()); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "${dummy}"); testRunner.setProperty(PostSlack.CHANNEL, "${dummy}");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
@ -124,13 +122,11 @@ public class PostSlackTextMessageTest {
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
assertFalse(servlet.hasBeenInteracted());
} }
@Test @Test
public void processShouldFailWhenTextIsEmptyAndNoAttachmentSpecified() { public void processShouldFailWhenTextIsEmptyAndNoAttachmentSpecified() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl()); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "${dummy}"); testRunner.setProperty(PostSlack.TEXT, "${dummy}");
@ -139,18 +135,18 @@ public class PostSlackTextMessageTest {
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
assertFalse(servlet.hasBeenInteracted());
} }
@Test @Test
public void emptyAttachmentShouldBeSkipped() { public void emptyAttachmentShouldBeSkipped() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "${dummy}"); testRunner.setProperty("attachment_01", "${dummy}");
testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}"); testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -164,12 +160,14 @@ public class PostSlackTextMessageTest {
@Test @Test
public void invalidAttachmentShouldBeSkipped() { public void invalidAttachmentShouldBeSkipped() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "{invalid-json}"); testRunner.setProperty("attachment_01", "{invalid-json}");
testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}"); testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -183,11 +181,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldFailWhenHttpErrorCodeReturned() { public void processShouldFailWhenHttpErrorCodeReturned() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl()); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(500));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -196,11 +196,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldFailWhenSlackReturnsError() { public void processShouldFailWhenSlackReturnsError() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_ERROR); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -209,11 +211,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldNotFailWhenSlackReturnsWarning() { public void processShouldNotFailWhenSlackReturnsWarning() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_WARNING); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -224,11 +228,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldFailWhenSlackReturnsEmptyJson() { public void processShouldFailWhenSlackReturnsEmptyJson() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_EMPTY_JSON); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -237,11 +243,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void processShouldFailWhenSlackReturnsInvalidJson() { public void processShouldFailWhenSlackReturnsInvalidJson() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_INVALID_JSON); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text"); testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -250,11 +258,13 @@ public class PostSlackTextMessageTest {
@Test @Test
public void sendInternationalMessageSuccessfully() { public void sendInternationalMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, server.getUrl() + REQUEST_PATH_SUCCESS_TEXT_MSG); testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token"); testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel"); testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn"); testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
@ -266,17 +276,17 @@ public class PostSlackTextMessageTest {
} }
private void assertBasicRequest(JsonObject requestBodyJson) { private void assertBasicRequest(JsonObject requestBodyJson) {
Map<String, String> requestHeaders = servlet.getLastPostHeaders();
assertEquals("Bearer my-access-token", requestHeaders.get("Authorization"));
assertEquals("application/json; charset=UTF-8", requestHeaders.get("Content-Type"));
assertEquals("my-channel", requestBodyJson.getString("channel")); assertEquals("my-channel", requestBodyJson.getString("channel"));
} }
private JsonObject getRequestBodyJson() { private JsonObject getRequestBodyJson() {
return Json.createReader( try {
new InputStreamReader( final RecordedRequest recordedRequest = mockWebServer.takeRequest();
new ByteArrayInputStream(servlet.getLastPostBody()), Charset.forName("UTF-8"))) try (final JsonReader reader = Json.createReader(new InputStreamReader(recordedRequest.getBody().inputStream()))) {
.readObject(); return reader.readObject();
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -16,17 +16,18 @@
*/ */
package org.apache.nifi.processors.slack; package org.apache.nifi.processors.slack;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.eclipse.jetty.servlet.ServletHandler;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Arrays; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -38,29 +39,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class PutSlackTest { public class PutSlackTest {
private TestRunner testRunner; private TestRunner testRunner;
private TestServer server;
private CaptureServlet servlet;
public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi"; public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi";
private MockWebServer mockWebServer;
private String url;
@BeforeEach @BeforeEach
public void init() throws Exception { public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PutSlack.class); testRunner = TestRunners.newTestRunner(PutSlack.class);
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(CaptureServlet.class, "/*");
// create the service
server = new TestServer();
server.addHandler(handler);
server.startServer();
servlet = (CaptureServlet) handler.getServlets()[0].getServlet();
} }
@Test @Test
public void testBlankText() { public void testBlankText() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, ""); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "");
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
@ -69,7 +63,7 @@ public class PutSlackTest {
@Test @Test
public void testBlankTextViaExpression() { public void testBlankTextViaExpression() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
@ -79,7 +73,7 @@ public class PutSlackTest {
@Test @Test
public void testInvalidChannel() { public void testInvalidChannel() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "invalid"); testRunner.setProperty(PutSlack.CHANNEL, "invalid");
@ -90,7 +84,7 @@ public class PutSlackTest {
@Test @Test
public void testInvalidIconUrl() { public void testInvalidIconUrl() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_URL, "invalid"); testRunner.setProperty(PutSlack.ICON_URL, "invalid");
@ -100,7 +94,7 @@ public class PutSlackTest {
@Test @Test
public void testInvalidIconEmoji() { public void testInvalidIconEmoji() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid"); testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid");
@ -110,7 +104,7 @@ public class PutSlackTest {
@Test @Test
public void testInvalidDynamicProperties() { public void testInvalidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder() PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true) .dynamic(true)
@ -125,7 +119,7 @@ public class PutSlackTest {
@Test @Test
public void testValidDynamicProperties() { public void testValidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder() PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true) .dynamic(true)
@ -133,6 +127,8 @@ public class PutSlackTest {
.build(); .build();
testRunner.setProperty(dynamicProp, "{\"a\": \"a\"}"); testRunner.setProperty(dynamicProp, "{\"a\": \"a\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue("{}".getBytes()); testRunner.enqueue("{}".getBytes());
testRunner.run(1); testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 0); testRunner.assertTransferCount(PutSlack.REL_FAILURE, 0);
@ -147,7 +143,7 @@ public class PutSlackTest {
props.put("ping", "pong"); props.put("ping", "pong");
ff = session.putAllAttributes(ff, props); ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder() PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true) .dynamic(true)
@ -155,6 +151,8 @@ public class PutSlackTest {
.build(); .build();
testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}"); testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(ff); testRunner.enqueue(ff);
testRunner.run(1); testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 1); testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 1);
@ -169,7 +167,7 @@ public class PutSlackTest {
props.put("ping", "\"pong"); props.put("ping", "\"pong");
ff = session.putAllAttributes(ff, props); ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder() PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true) .dynamic(true)
@ -184,7 +182,7 @@ public class PutSlackTest {
} }
@Test @Test
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() {
PutSlack processor = new PutSlack(); PutSlack processor = new PutSlack();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals(6, pd.size(), "size should be eq"); assertEquals(6, pd.size(), "size should be eq");
@ -197,64 +195,80 @@ public class PutSlackTest {
} }
@Test @Test
public void testSimplePut() { public void testSimplePut() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
byte[] expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D".getBytes(); String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D";
assertTrue(Arrays.equals(expected, servlet.getLastPost())); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
} }
@Test @Test
public void testSimplePutWithAttributes() { public void testSimplePutWithAttributes() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes"); testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook"); testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:"); testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" + final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" +
"integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D"; "integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D";
assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost())); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
} }
@Test @Test
public void testSimplePutWithAttributesIconURL() { public void testSimplePutWithAttributesIconURL() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url"); testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook"); testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/"); testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]); testRunner.enqueue(new byte[0]);
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22" final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22"
+ "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D"; + "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D";
assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost())); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
} }
@Test @Test
public void testSimplePutWithEL() { public void testSimplePutWithEL() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, "${slack.url}"); testRunner.setProperty(PutSlack.WEBHOOK_URL, "${slack.url}");
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0], new HashMap<String,String>(){{ testRunner.enqueue(new byte[0], new HashMap<String,String>(){{
put("slack.url", server.getUrl()); put("slack.url", url);
}}); }});
testRunner.run(1); testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
byte[] expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D".getBytes(); String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D";
assertTrue(Arrays.equals(expected, servlet.getLastPost())); final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
} }
} }

View File

@ -79,12 +79,6 @@
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId> <artifactId>commons-text</artifactId>

View File

@ -1,125 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.livy;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.web.util.TestServer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
class ExecuteSparkInteractiveTestBase {
public static class LivyAPIHandler extends AbstractHandler {
int session1Requests = 0;
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
int responseStatus = 404;
String responseContentType = "text/plain";
String responseBody = "Not found";
if ("GET".equalsIgnoreCase(request.getMethod())) {
responseStatus = 200;
responseBody = "{}";
responseContentType = "application/json";
if ("/sessions".equalsIgnoreCase(target)) {
responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}";
} else if (target.startsWith("/sessions/") && !target.contains("statement")) {
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
} else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) {
switch (session1Requests) {
case 0:
responseBody = "{\"state\": \"waiting\"}";
break;
case 1:
responseBody = "{\"state\": \"running\"}";
break;
case 2:
responseBody = "{\"state\": \"available\", \"output\": {\"data\": {\"text/plain\": \"Hello world\"}}}";
break;
default:
responseBody = "{\"state\": \"error\"}";
break;
}
session1Requests++;
}
} else if ("POST".equalsIgnoreCase(request.getMethod())) {
String requestBody = IOUtils.toString(request.getReader());
try {
// validate JSON payload
new ObjectMapper().readTree(requestBody);
responseStatus = 200;
responseBody = "{}";
responseContentType = "application/json";
if ("/sessions".equalsIgnoreCase(target)) {
responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}";
} else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
responseBody = "{\"id\": 7}";
}
} catch (JsonProcessingException e) {
responseStatus = 400;
responseContentType = "text/plain";
responseBody = "Bad request";
}
}
response.setStatus(responseStatus);
response.setContentType(responseContentType);
response.setContentLength(responseBody.length());
try (PrintWriter writer = response.getWriter()) {
writer.print(responseBody);
writer.flush();
}
}
}
TestRunner runner;
void testCode(TestServer server, String code) throws Exception {
server.addHandler(new LivyAPIHandler());
runner.enqueue(code);
runner.run();
List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
while (!waitingFlowfiles.isEmpty()) {
runner.assertTransferCount(ExecuteSparkInteractive.REL_FAILURE, 0);
Thread.sleep(1000);
runner.clearTransferState();
runner.enqueue(code);
runner.run();
waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
}
runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
}
}

View File

@ -16,50 +16,52 @@
*/ */
package org.apache.nifi.processors.livy; package org.apache.nifi.processors.livy;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHttpResponse;
import org.apache.nifi.controller.api.livy.LivySessionService;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.controller.livy.LivySessionController; import org.apache.nifi.controller.livy.LivySessionController;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase { import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
private static TestServer server; import static org.mockito.ArgumentMatchers.isA;
private static String url; import static org.mockito.Mockito.when;
@BeforeAll @ExtendWith(MockitoExtension.class)
public static void beforeClass() throws Exception { public class TestExecuteSparkInteractive {
// useful for verbose logging output @Mock
// don't commit this with this property enabled, or any 'mvn test' will be really verbose private LivySessionService livySessionService;
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port @Mock
server = createServer(); private HttpClient httpClient;
server.startServer();
// this is the base url with the random port private TestRunner runner;
url = server.getUrl();
}
@AfterAll
public static void afterClass() throws Exception {
server.shutdownServer();
}
@BeforeEach @BeforeEach
public void before() throws Exception { public void before() throws Exception {
final String identifier = LivySessionController.class.getSimpleName();
runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class); runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class);
LivySessionController livyControllerService = new LivySessionController();
runner.addControllerService("livyCS", livyControllerService);
runner.setProperty(livyControllerService, LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, url.lastIndexOf(":")));
runner.setProperty(livyControllerService, LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1));
runner.enableControllerService(livyControllerService);
runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, "livyCS");
server.clearHandlers(); when(livySessionService.getIdentifier()).thenReturn(identifier);
runner.addControllerService(identifier, livySessionService);
runner.enableControllerService(livySessionService);
runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, identifier);
} }
@AfterEach @AfterEach
@ -67,17 +69,47 @@ public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase
runner.shutdown(); runner.shutdown();
} }
private static TestServer createServer() { @Test
return new TestServer(); public void testSparkSession() throws SessionManagerException, IOException {
testCode("print \"hello world\"");
} }
@Test @Test
public void testSparkSession() throws Exception { public void testSparkSessionWithSpecialChars() throws SessionManagerException, IOException {
testCode(server, "print \"hello world\""); testCode("print \"/'?!<>[]{}()$&*=%;.|_-\\\"");
} }
@Test private void testCode(final String code) throws SessionManagerException, IOException {
public void testSparkSessionWithSpecialChars() throws Exception { runner.enqueue(code);
testCode(server, "print \"/'?!<>[]{}()$&*=%;.|_-\\\""); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSparkInteractive.REL_WAIT);
final String sessionId = "1";
final Map<String, String> sessions = new LinkedHashMap<>();
sessions.put("sessionId", sessionId);
when(livySessionService.getSession()).thenReturn(sessions);
when(livySessionService.getConnection()).thenReturn(httpClient);
final HttpResponse jobId = getSuccessResponse();
jobId.setEntity(new StringEntity("{\"id\":\"1\"}"));
when(httpClient.execute(isA(HttpPost.class))).thenReturn(jobId);
final HttpResponse jobState = getSuccessResponse();
final String dataObject = "{\"completed\":1}";
jobState.setEntity(new StringEntity(String.format("{\"state\":\"available\", \"output\":{\"data\":%s}}", dataObject)));
when(httpClient.execute(isA(HttpGet.class))).thenReturn(jobState);
runner.clearTransferState();
runner.enqueue(code);
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSparkInteractive.REL_SUCCESS);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_SUCCESS).iterator().next();
flowFile.assertContentEquals(dataObject);
}
private HttpResponse getSuccessResponse() {
return new BasicHttpResponse(HttpVersion.HTTP_1_1, 200, "OK");
} }
} }

View File

@ -369,12 +369,6 @@
<artifactId>nifi-database-utils</artifactId> <artifactId>nifi-database-utils</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.sshd</groupId> <groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId> <artifactId>sshd-core</artifactId>

View File

@ -1,533 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.URLEncoder;
import javax.net.ssl.SSLContext;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.JettyServerUtils;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Integration Test for deprecated GetHTTP Processor
*/
@SuppressWarnings("deprecation")
public class ITGetHTTP {
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
private static final String HTTP_URL = "http://localhost:%d";
private static final String HTTPS_URL = "https://localhost:%d";
private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext;
private TestRunner controller;
@BeforeClass
public static void configureServices() throws TlsException {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.GetHTTP", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestGetHTTP", "debug");
keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
}
@Test
public final void testContentModified() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
// create the service
final int port = NetworkUtils.availablePort();
final Server server = JettyServerUtils.createServer(port, null, null);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
// this is the base url with the random port
String destination = String.format(HTTP_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.ETAG, Scope.LOCAL);
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED, Scope.LOCAL);
controller.run(2);
// verify the lastModified and entityTag are updated
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
// ran twice, but got one...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
// verify remote.source flowfile attribute
controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost");
controller.clearTransferState();
// turn off checking for etag and lastModified
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
controller.run(2);
// ran twice, got two...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 2);
controller.clearTransferState();
// turn on checking for etag
RESTServiceContentModified.IGNORE_ETAG = false;
controller.run(2);
// ran twice, got 0...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
// turn on checking for lastModified, but off for etag
RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
RESTServiceContentModified.IGNORE_ETAG = true;
controller.run(2);
// ran twice, got 0...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
// turn off checking for lastModified, turn on checking for etag, but change the value
RESTServiceContentModified.IGNORE_LAST_MODIFIED = true;
RESTServiceContentModified.IGNORE_ETAG = false;
RESTServiceContentModified.ETAG = 1;
controller.run(2);
// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
String eTagStateValue = controller.getStateManager().getState(Scope.LOCAL).get(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination);
assertEquals("1",org.apache.nifi.processors.standard.GetHTTP.parseStateValue(eTagStateValue).getValue());
controller.clearTransferState();
// turn off checking for Etag, turn on checking for lastModified, but change value
RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
RESTServiceContentModified.IGNORE_ETAG = true;
RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000;
String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination);
controller.run(2);
// ran twice, got 1...but should have new cached etag
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination, lastMod, Scope.LOCAL);
controller.clearTransferState();
} finally {
// shutdown web service
server.stop();
server.destroy();
}
}
@Test
public final void testContentModifiedTwoServers() throws Exception {
final int port1 = NetworkUtils.availablePort();
final Server server1 = JettyServerUtils.createServer(port1, null, null);
final ServletHandler handler1 = new ServletHandler();
handler1.addServletWithMapping(RESTServiceContentModified.class, "/*");
JettyServerUtils.addHandler(server1, handler1);
final int port2 = NetworkUtils.availablePort();
final Server server2 = JettyServerUtils.createServer(port2, null, null);
final ServletHandler handler2 = new ServletHandler();
handler2.addServletWithMapping(RESTServiceContentModified.class, "/*");
JettyServerUtils.addHandler(server2, handler2);
try {
JettyServerUtils.startServer(server1);
JettyServerUtils.startServer(server2);
// this is the base urls with the random ports
String destination1 = String.format(HTTP_URL, port1);
String destination2 = String.format(HTTP_URL, port2);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination1);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination1, Scope.LOCAL);
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination1, Scope.LOCAL);
controller.run(2);
// verify the lastModified and entityTag are updated
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
// ran twice, but got one...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
controller.clearTransferState();
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination2);
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination2, Scope.LOCAL);
controller.getStateManager().assertStateNotSet(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination2, Scope.LOCAL);
controller.run(2);
// ran twice, but got one...which is good
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
// verify the lastModified's and entityTags are updated
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.ETAG+":"+destination2, "", Scope.LOCAL);
controller.getStateManager().assertStateNotEquals(org.apache.nifi.processors.standard.GetHTTP.LAST_MODIFIED+":"+destination2, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
} finally {
// shutdown web services
server1.stop();
server1.destroy();
server2.stop();
server2.destroy();
}
}
@Test
public final void testUserAgent() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
// create the service
final int port = NetworkUtils.availablePort();
Server server = JettyServerUtils.createServer(port, null, null);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTP_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.run();
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.USER_AGENT, "testUserAgent");
controller.run();
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
// shutdown web service
} finally {
server.stop();
server.destroy();
}
}
@Test
public final void testDynamicHeaders() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
// create the service
final int port = NetworkUtils.availablePort();
Server server = JettyServerUtils.createServer(port, null, null);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTP_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.USER_AGENT, "testUserAgent");
controller.setProperty("Static-Header", "StaticHeaderValue");
controller.setProperty("EL-Header", "${now()}");
controller.run();
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
// shutdown web service
} finally {
server.stop();
server.destroy();
}
}
@Test
public final void testExpressionLanguage() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
// create the service
final int port = NetworkUtils.availablePort();
Server server = JettyServerUtils.createServer(port, null, null);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTP_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination+"/test_${literal(1)}.pdf");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "test_${now():format('yyyy/MM/dd_HH:mm:ss')}");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.USER_AGENT, "testUserAgent");
controller.run();
controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
MockFlowFile response = controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0);
response.assertAttributeEquals("gethttp.remote.source","localhost");
String fileName = response.getAttribute(CoreAttributes.FILENAME.key());
assertTrue(fileName.matches("test_\\d\\d\\d\\d/\\d\\d/\\d\\d_\\d\\d:\\d\\d:\\d\\d"));
// shutdown web service
} finally {
server.stop();
server.destroy();
}
}
/**
* Test for HTTP errors
* @throws Exception exception
*/
@Test
public final void testHttpErrors() throws Exception {
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(HttpErrorServlet.class, "/*");
// create the service
final int port = NetworkUtils.availablePort();
Server server = JettyServerUtils.createServer(port, null, null);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTP_URL, port);
HttpErrorServlet servlet = (HttpErrorServlet) handler.getServlets()[0].getServlet();
this.controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
this.controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
this.controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination+"/test_${literal(1)}.pdf");
this.controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "test_${now():format('yyyy/MM/dd_HH:mm:ss')}");
this.controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
this.controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.USER_AGENT, "testUserAgent");
// 204 - NO CONTENT
servlet.setErrorToReturn(HttpServletResponse.SC_NO_CONTENT);
this.controller.run();
this.controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
// 404 - NOT FOUND
servlet.setErrorToReturn(HttpServletResponse.SC_NOT_FOUND);
this.controller.run();
this.controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
// 500 - INTERNAL SERVER ERROR
servlet.setErrorToReturn(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
this.controller.run();
this.controller.assertTransferCount(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
} finally {
// shutdown web service
server.stop();
server.destroy();
}
}
@Test
public final void testTlsClientAuthenticationNone() throws Exception {
// set up web service
final ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(HelloWorldServlet.class, "/*");
// create the service, disabling the need for client auth
final int port = NetworkUtils.availablePort();
final Server server = JettyServerUtils.createServer(port, keyStoreSslContext, ClientAuth.NONE);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTPS_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
// Use context service with only a truststore
enableSslContextService(trustStoreSslContext);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.run();
controller.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
final MockFlowFile mff = controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello, World!");
} finally {
server.stop();
server.destroy();
}
}
@Test
public final void testTlsClientAuthenticationRequired() throws Exception {
// set up web service
final ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(HelloWorldServlet.class, "/*");
// create the service, providing both truststore and keystore properties, requiring client auth (default)
final int port = NetworkUtils.availablePort();
final Server server = JettyServerUtils.createServer(port, keyStoreSslContext, ClientAuth.REQUIRED);
server.setHandler(handler);
try {
JettyServerUtils.startServer(server);
final String destination = String.format(HTTPS_URL, port);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
// Use context service with a keystore and a truststore
enableSslContextService(keyStoreSslContext);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "10 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
controller.run();
controller.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
final MockFlowFile mff = controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello, World!");
} finally {
server.stop();
server.destroy();
}
}
@Test
public final void testCookiePolicy() throws Exception {
final int port1 = NetworkUtils.availablePort();
final Server server1 = JettyServerUtils.createServer(port1, null, null);
final ServletHandler handler1 = new ServletHandler();
handler1.addServletWithMapping(CookieTestingServlet.class, "/*");
JettyServerUtils.addHandler(server1, handler1);
final int port2 = NetworkUtils.availablePort();
final Server server2 = JettyServerUtils.createServer(port2, null, null);
final ServletHandler handler2 = new ServletHandler();
handler2.addServletWithMapping(CookieVerificationTestingServlet.class, "/*");
JettyServerUtils.addHandler(server2, handler2);
try {
JettyServerUtils.startServer(server1);
JettyServerUtils.startServer(server2);
// this is the base urls with the random ports
String destination1 = String.format(HTTP_URL, port1);
String destination2 = String.format(HTTP_URL, port2);
// set up NiFi mock controller
controller = TestRunners.newTestRunner(org.apache.nifi.processors.standard.GetHTTP.class);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.CONNECTION_TIMEOUT, "5 secs");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination1 + "/?redirect=" + URLEncoder.encode(destination2, "UTF-8")
+ "&datemode=" + CookieTestingServlet.DATEMODE_COOKIE_DEFAULT);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FILENAME, "testFile");
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.FOLLOW_REDIRECTS, "true");
controller.run(1);
// verify default cookie data does successful redirect
controller.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
MockFlowFile ff = controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0);
ff.assertContentEquals("Hello, World!");
controller.clearTransferState();
// verify NON-standard cookie data fails with default redirect_cookie_policy
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination1 + "/?redirect=" + URLEncoder.encode(destination2, "UTF-8")
+ "&datemode=" + CookieTestingServlet.DATEMODE_COOKIE_NOT_TYPICAL);
controller.run(1);
controller.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 0);
controller.clearTransferState();
// change GetHTTP to place it in STANDARD cookie policy mode
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.REDIRECT_COOKIE_POLICY, org.apache.nifi.processors.standard.GetHTTP.STANDARD_COOKIE_POLICY_STR);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.URL, destination1 + "/?redirect=" + URLEncoder.encode(destination2, "UTF-8")
+ "&datemode=" + CookieTestingServlet.DATEMODE_COOKIE_NOT_TYPICAL);
controller.run(1);
// verify NON-standard cookie data does successful redirect
controller.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS, 1);
ff = controller.getFlowFilesForRelationship(org.apache.nifi.processors.standard.GetHTTP.REL_SUCCESS).get(0);
ff.assertContentEquals("Hello, World!");
} finally {
// shutdown web services
server1.stop();
server1.destroy();
server2.stop();
server2.destroy();
}
}
private void enableSslContextService(final SSLContext configuredSslContext) throws InitializationException {
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);
Mockito.when(sslContextService.createContext()).thenReturn(configuredSslContext);
controller.addControllerService(SSL_CONTEXT_IDENTIFIER, sslContextService);
controller.enableControllerService(sslContextService);
controller.setProperty(org.apache.nifi.processors.standard.GetHTTP.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
}
}

View File

@ -1,535 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.FlowFileUnpackagerV3;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.JettyServerUtils;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
/**
* Integration Test for deprecated PostHTTP Processor
*/
@SuppressWarnings("deprecation")
public class ITPostHTTP {
private Server server;
private TestRunner runner;
private CaptureServlet servlet;
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
private static final String TEST_MESSAGE = String.class.getName();
private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext;
@BeforeClass
public static void configureServices() throws TlsException {
keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
}
private static String getUrl(final SSLContext sslContext, final int port) {
final String protocol = sslContext == null ? "http" : "https";
return String.format("%s://localhost:%d", protocol, port);
}
private void setup(final SSLContext serverSslContext, final ClientAuth clientAuth) throws Exception {
runner = TestRunners.newTestRunner(org.apache.nifi.processors.standard.PostHTTP.class);
final int port = NetworkUtils.availablePort();
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.URL, getUrl(serverSslContext, port));
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(CaptureServlet.class, "/*");
final Server configuredServer = JettyServerUtils.createServer(port, serverSslContext, clientAuth);
configuredServer.setHandler(handler);
final ServerConnector connector = new ServerConnector(configuredServer);
connector.setPort(port);
JettyServerUtils.startServer(configuredServer);
servlet = (CaptureServlet) handler.getServlets()[0].getServlet();
}
@After
public void cleanup() throws Exception {
if (server != null) {
server.stop();
server.destroy();
server = null;
}
}
@Test
public void testUnauthenticatedTls() throws Exception {
setup(keyStoreSslContext, ClientAuth.NONE);
enableSslContextService(trustStoreSslContext);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
runner.enqueue(TEST_MESSAGE);
runner.run();
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS, 1);
}
@Test
public void testMutualTls() throws Exception {
setup(keyStoreSslContext, ClientAuth.REQUIRED);
enableSslContextService(keyStoreSslContext);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
runner.enqueue(TEST_MESSAGE);
runner.run();
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS, 1);
}
@Test
public void testMutualTlsClientCertificateMissing() throws Exception {
setup(keyStoreSslContext, ClientAuth.REQUIRED);
enableSslContextService(trustStoreSslContext);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
runner.enqueue(TEST_MESSAGE);
runner.run();
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_FAILURE, 1);
}
@Test
public void testSendAsFlowFile() throws Exception {
setup( null, null);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.SEND_AS_FLOWFILE, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("abc", "cba");
runner.enqueue(TEST_MESSAGE, attrs);
attrs.put("abc", "abc");
attrs.put("filename", "xyz.txt");
runner.enqueue("World".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
final byte[] lastPost = servlet.getLastPost();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost);
FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3();
// unpack first flowfile received
Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
byte[] contentReceived = baos.toByteArray();
assertEquals(TEST_MESSAGE, new String(contentReceived));
assertEquals("cba", receivedAttrs.get("abc"));
assertTrue(unpacker.hasMoreData());
baos.reset();
receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
contentReceived = baos.toByteArray();
assertEquals("World", new String(contentReceived));
assertEquals("abc", receivedAttrs.get("abc"));
assertEquals("xyz.txt", receivedAttrs.get("filename"));
Assert.assertNull(receivedAttrs.get("Content-Length"));
}
@Test
public void testMutualTlsSendFlowFile() throws Exception {
setup(keyStoreSslContext, ClientAuth.REQUIRED);
enableSslContextService(keyStoreSslContext);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.SEND_AS_FLOWFILE, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("abc", "cba");
runner.enqueue(TEST_MESSAGE, attrs);
attrs.put("abc", "abc");
attrs.put("filename", "xyz.txt");
runner.enqueue("World".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
final byte[] lastPost = servlet.getLastPost();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost);
FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3();
// unpack first flowfile received
Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
byte[] contentReceived = baos.toByteArray();
assertEquals(TEST_MESSAGE, new String(contentReceived));
assertEquals("cba", receivedAttrs.get("abc"));
assertTrue(unpacker.hasMoreData());
baos.reset();
receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
contentReceived = baos.toByteArray();
assertEquals("World", new String(contentReceived));
assertEquals("abc", receivedAttrs.get("abc"));
assertEquals("xyz.txt", receivedAttrs.get("filename"));
}
@Test
public void testSendWithMimeType() throws Exception {
setup(null, null);
final Map<String, String> attrs = new HashMap<>();
final String suppliedMimeType = "text/plain";
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Camping is great!".getBytes(), attrs);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertEquals("17",lastPostHeaders.get("Content-Length"));
}
@Test
public void testSendWithEmptyELExpression() throws Exception {
setup( null, null);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "");
runner.enqueue("The wilderness.".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(org.apache.nifi.processors.standard.PostHTTP.DEFAULT_CONTENT_TYPE, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
}
@Test
public void testSendWithContentTypeProperty() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.FALSE.toString());
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
runner.enqueue("Sending with content type property.".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
}
@Test
public void testSendWithCompressionServerAcceptGzip() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.COMPRESSION_LEVEL, "9");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that a 'Content-Encoding' header was set with a 'gzip' value
Assert.assertEquals(org.apache.nifi.processors.standard.PostHTTP.CONTENT_ENCODING_GZIP_VALUE, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_ENCODING_HEADER));
Assert.assertNull(lastPostHeaders.get("Content-Length"));
}
@Test
public void testSendWithoutCompressionServerAcceptGzip() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.COMPRESSION_LEVEL, "0");
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, "false");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_ENCODING_HEADER));
Assert.assertEquals("2100",lastPostHeaders.get("Content-Length"));
}
@Test
public void testSendWithCompressionServerNotAcceptGzip() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
// Specify a property to the URL to have the CaptureServlet specify it doesn't accept gzip
final String serverUrl = runner.getProcessContext().getProperty(PostHTTP.URL).getValue();
final String url = String.format("%s?acceptGzip=false", serverUrl);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.URL, url);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.COMPRESSION_LEVEL, "9");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_ENCODING_HEADER));
}
@Test
public void testSendChunked() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, Boolean.TRUE.toString());
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
byte[] postValue = servlet.getLastPost();
Assert.assertArrayEquals(StringUtils.repeat("Lines of sample text.", 100).getBytes(),postValue);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertNull(lastPostHeaders.get("Content-Length"));
Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding"));
}
@Test
public void testSendWithThrottler() throws Exception {
setup(null, null);
final String suppliedMimeType = "text/plain";
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.CHUNKED_ENCODING, "false");
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.MAX_DATA_RATE, "10kb");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(), attrs);
boolean stopOnFinish = true;
runner.run(1, stopOnFinish);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
byte[] postValue = servlet.getLastPost();
Assert.assertArrayEquals(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(),postValue);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(org.apache.nifi.processors.standard.PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertEquals("4700",lastPostHeaders.get("Content-Length"));
}
@Test
public void testDefaultUserAgent() throws Exception {
setup(null, null);
Assert.assertTrue(runner.getProcessContext().getProperty(org.apache.nifi.processors.standard.PostHTTP.USER_AGENT).getValue().startsWith("Apache-HttpClient"));
}
@Test
public void testBatchWithMultipleUrls() throws Exception {
setup(null,null);
final CaptureServlet servletA = servlet;
final String urlA = runner.getProcessContext().getProperty(org.apache.nifi.processors.standard.PostHTTP.URL).getValue();
// set up second web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(CaptureServlet.class, "/*");
// create the second service
final int portB = NetworkUtils.availablePort();
final String urlB = getUrl(null, portB);
final Server serverB = JettyServerUtils.createServer(portB, null, null);
serverB.setHandler(handler);
JettyServerUtils.startServer(serverB);
final CaptureServlet servletB = (CaptureServlet) handler.getServlets()[0].getServlet();
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.URL, "${url}"); // use EL for the URL
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.SEND_AS_FLOWFILE, "true");
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.MAX_BATCH_SIZE, "10 b");
Set<String> expectedContentA = new HashSet<>();
Set<String> expectedContentB = new HashSet<>();
Set<String> actualContentA = new HashSet<>();
Set<String> actualContentB = new HashSet<>();
// enqueue 9 FlowFiles
for (int i = 0; i < 9; i++) {
enqueueWithURL("a" + i, urlA);
enqueueWithURL("b" + i, urlB);
expectedContentA.add("a" + i);
expectedContentB.add("b" + i);
}
// MAX_BATCH_SIZE is 10 bytes, each file is 2 bytes, so 18 files should produce 4 batches
for (int i = 0; i < 4; i++) {
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
assertFalse(successFiles.isEmpty());
MockFlowFile mff = successFiles.get(0);
final String urlAttr = mff.getAttribute("url");
if (urlA.equals(urlAttr)) {
checkBatch(urlA, servletA, actualContentA, (actualContentA.isEmpty() ? 5 : 4));
} else if (urlB.equals(urlAttr)) {
checkBatch(urlB, servletB, actualContentB, (actualContentB.isEmpty() ? 5 : 4));
} else {
fail("unexpected url attribute");
}
}
assertEquals(expectedContentA, actualContentA);
assertEquals(expectedContentB, actualContentB);
// make sure everything transferred, nothing more to do
runner.run(1);
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS, 0);
}
private void enqueueWithURL(String data, String url) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("url", url);
runner.enqueue(data.getBytes(), attrs);
}
private void checkBatch(final String url, CaptureServlet servlet, Set<String> actualContent, int expectedCount) throws Exception {
FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3();
Set<String> actualFFContent = new HashSet<>();
Set<String> actualPostContent = new HashSet<>();
runner.assertAllFlowFilesTransferred(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS, expectedCount);
// confirm that all FlowFiles transferred to 'success' have the same URL
// also accumulate content to verify later
final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(org.apache.nifi.processors.standard.PostHTTP.REL_SUCCESS);
for (int i = 0; i < expectedCount; i++) {
MockFlowFile mff = successFlowFiles.get(i);
mff.assertAttributeEquals("url", url);
String content = new String(mff.toByteArray());
actualFFContent.add(content);
}
// confirm that all FlowFiles POSTed to server have the same URL
// also accumulate content to verify later
try (ByteArrayInputStream bais = new ByteArrayInputStream(servlet.getLastPost());
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
for (int i = 0; i < expectedCount; i++) {
Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos);
final byte[] bytesReceived = baos.toByteArray();
String receivedContent = new String(bytesReceived, StandardCharsets.UTF_8);
actualPostContent.add(receivedContent);
assertEquals(url, receivedAttrs.get("url"));
assertTrue(unpacker.hasMoreData() || i == (expectedCount - 1));
baos.reset();
}
}
// confirm that the transferred and POSTed content match
assertEquals(actualFFContent, actualPostContent);
// accumulate actual content
actualContent.addAll(actualPostContent);
runner.clearTransferState();
}
private void enableSslContextService(final SSLContext configuredSslContext) throws InitializationException {
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);
Mockito.when(sslContextService.createContext()).thenReturn(configuredSslContext);
runner.addControllerService(SSL_CONTEXT_IDENTIFIER, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(org.apache.nifi.processors.standard.PostHTTP.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
}
}

View File

@ -1,39 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-standard-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-standard-web-test-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;
import org.apache.nifi.security.util.ClientAuth;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import javax.net.ssl.SSLContext;
public class JettyServerUtils {
private static final long IDLE_TIMEOUT = 60000;
private static final long SERVER_START_SLEEP = 100L;
public static Server createServer(final int port, final SSLContext sslContext, final ClientAuth clientAuth) {
final Server server = new Server();
final ServerConnector connector;
if (sslContext == null) {
connector = new ServerConnector(server);
} else {
final SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setSslContext(sslContext);
if (ClientAuth.REQUIRED.equals(clientAuth)) {
sslContextFactory.setNeedClientAuth(true);
}
connector = new ServerConnector(server, sslContextFactory);
}
connector.setPort(port);
connector.setIdleTimeout(IDLE_TIMEOUT);
server.addConnector(connector);
final HandlerCollection handlerCollection = new HandlerCollection(true);
server.setHandler(handlerCollection);
return server;
}
public static void startServer(final Server server) throws Exception {
server.start();
while (!server.isStarted()) {
Thread.sleep(SERVER_START_SLEEP);
}
}
public static void addHandler(final Server server, final Handler handler) {
final Handler serverHandler = server.getHandler();
if (serverHandler instanceof HandlerCollection) {
final HandlerCollection handlerCollection = (HandlerCollection) serverHandler;
handlerCollection.addHandler(handler);
}
}
public static void clearHandlers(final Server server) {
final Handler serverHandler = server.getHandler();
if (serverHandler instanceof HandlerCollection) {
final HandlerCollection handlerCollection = (HandlerCollection) serverHandler;
final Handler[] handlers = handlerCollection.getHandlers();
if (handlers != null) {
for (final Handler handler : handlerCollection.getHandlers()) {
handlerCollection.removeHandler(handler);
}
}
}
}
}

View File

@ -1,159 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;
import java.util.Map;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
* Test server to assist with unit tests that requires a server to be stood up.
*/
public class TestServer {
public static final String NEED_CLIENT_AUTH = "clientAuth";
private static final long IDLE_TIMEOUT = 60000;
private Server jetty;
private boolean secure = false;
/**
* Creates the test server.
*/
public TestServer() {
createServer(null);
}
/**
* Creates the test server.
*
* @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
*/
public TestServer(final Map<String, String> sslProperties) {
createServer(sslProperties);
}
private void createServer(final Map<String, String> sslProperties) {
jetty = new Server();
// create the unsecure connector
createConnector();
// create the secure connector if sslProperties are specified
if (sslProperties != null) {
createSecureConnector(sslProperties);
}
jetty.setHandler(new HandlerCollection(true));
}
/**
* Creates the http connection
*/
private void createConnector() {
final ServerConnector http = new ServerConnector(jetty);
http.setPort(0);
// Severely taxed environments may have significant delays when executing.
http.setIdleTimeout(IDLE_TIMEOUT);
jetty.addConnector(http);
}
private void createSecureConnector(final Map<String, String> sslProperties) {
SslContextFactory.Server ssl = new SslContextFactory.Server();
if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
}
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
ssl.setNeedClientAuth(true);
} else {
ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
}
// build the connector
final ServerConnector https = new ServerConnector(jetty, ssl);
// set host and port
https.setPort(0);
// Severely taxed environments may have significant delays when executing.
https.setIdleTimeout(IDLE_TIMEOUT);
// add the connector
jetty.addConnector(https);
// mark secure as enabled
secure = true;
}
public void clearHandlers() {
JettyServerUtils.clearHandlers(jetty);
}
public void addHandler(final Handler handler) {
JettyServerUtils.addHandler(jetty, handler);
}
public void startServer() throws Exception {
jetty.start();
}
public void shutdownServer() throws Exception {
jetty.stop();
jetty.destroy();
}
public int getPort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
}
public int getSecurePort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
}
public String getUrl() {
return "http://localhost:" + getPort();
}
public String getSecureUrl() {
String url = null;
if (secure) {
url = "https://localhost:" + getSecurePort();
}
return url;
}
}

View File

@ -30,7 +30,6 @@
<module>nifi-standard-nar</module> <module>nifi-standard-nar</module>
<module>nifi-jolt-transform-json-ui</module> <module>nifi-jolt-transform-json-ui</module>
<module>nifi-standard-utils</module> <module>nifi-standard-utils</module>
<module>nifi-standard-web-test-utils</module>
</modules> </modules>
<properties> <properties>
<yammer.metrics.version>2.2.0</yammer.metrics.version> <yammer.metrics.version>2.2.0</yammer.metrics.version>

View File

@ -166,12 +166,6 @@
<artifactId>javax.servlet-api</artifactId> <artifactId>javax.servlet-api</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId> <artifactId>jetty-servlet</artifactId>

View File

@ -1,365 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup
import org.apache.avro.Schema
import org.apache.nifi.avro.AvroTypeUtil
import org.apache.nifi.json.JsonTreeReader
import org.apache.nifi.lookup.rest.SchemaUtil
import org.apache.nifi.lookup.rest.handlers.BasicAuth
import org.apache.nifi.lookup.rest.handlers.ComplexJson
import org.apache.nifi.lookup.rest.handlers.NoRecord
import org.apache.nifi.lookup.rest.handlers.SimpleJson
import org.apache.nifi.lookup.rest.handlers.SimpleJsonArray
import org.apache.nifi.lookup.rest.handlers.VerbTest
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.apache.nifi.web.util.TestServer
import org.eclipse.jetty.servlet.ServletHandler
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class RestLookupServiceIT {
static final JsonTreeReader reader
static final MockSchemaRegistry registry = new MockSchemaRegistry()
static final RecordSchema simpleSchema
static final RecordSchema nestedSchema
TestServer server
TestRunner runner
RestLookupService lookupService
static {
simpleSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(SchemaUtil.SIMPLE))
nestedSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(SchemaUtil.COMPLEX))
registry.addSchema("simple", simpleSchema)
registry.addSchema("complex", nestedSchema)
reader = new JsonTreeReader()
}
@Before
void setup() {
lookupService = new RestLookupService()
runner = TestRunners.newTestRunner(TestRestLookupServiceProcessor.class)
runner.addControllerService("jsonReader", reader)
runner.addControllerService("registry", registry)
runner.addControllerService("lookupService", lookupService)
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(lookupService, RestLookupService.RECORD_READER, "jsonReader")
runner.setProperty(TestRestLookupServiceProcessor.CLIENT_SERVICE, "lookupService")
runner.enableControllerService(registry)
runner.enableControllerService(reader)
}
@Test
void basicAuth() {
runner.setProperty(lookupService, RestLookupService.PROP_BASIC_AUTH_USERNAME, "john.smith")
runner.setProperty(lookupService, RestLookupService.PROP_BASIC_AUTH_PASSWORD, "testing1234")
TestServer server = new TestServer()
server.addHandler(new BasicAuth())
try {
server.startServer()
setEndpoint(server.port, '/${schema.name}')
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ "schema.name": "simple" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("john.smith", record.getAsString("username"))
Assert.assertEquals("testing1234", record.getAsString("password"))
Throwable t
try {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, RestLookupService.PROP_BASIC_AUTH_USERNAME, "john.smith2")
runner.setProperty(lookupService, RestLookupService.PROP_BASIC_AUTH_PASSWORD, ":wetadfasdfadf")
runner.enableControllerService(lookupService)
lookupService.lookup(coordinates)
} catch (Throwable lfe) {
t = lfe
}
Assert.assertNotNull(t)
Assert.assertTrue(t.getClass().getCanonicalName(), t instanceof LookupFailureException)
} finally {
server.shutdownServer()
}
}
@Test
void simpleJson() {
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(SimpleJson.class, "/simple")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/simple")
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ "schema.name": "simple" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("john.smith", record.getAsString("username"))
Assert.assertEquals("testing1234", record.getAsString("password"))
} finally {
server.shutdownServer()
}
}
@Test
void noRecord() {
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(NoRecord.class, "/simple")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/simple")
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ "schema.name": "simple" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertNull(record.getAsString("username"))
Assert.assertNull(record.getAsString("password"))
} finally {
server.shutdownServer()
}
}
@Test
void simpleJsonArray() {
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(SimpleJsonArray.class, "/simple_array")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/simple_array")
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ "schema.name": "simple" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("john.smith", record.getAsString("username"))
Assert.assertEquals("testing1234", record.getAsString("password"))
} finally {
server.shutdownServer()
}
}
@Test
void testHeaders() {
runner.setProperty(lookupService, "X-USER", '${x.user}')
runner.setProperty(lookupService, "X-PASS", 'testing7890')
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(SimpleJson.class, "/simple")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/simple")
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ 'schema.name': 'simple' , 'x.user': 'jane.doe']
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("jane.doe", record.getAsString("username"))
Assert.assertEquals("testing7890", record.getAsString("password"))
} finally {
server.shutdownServer()
}
}
@Test
void complexJson() {
runner.setProperty(lookupService, RestLookupService.RECORD_PATH, "/top/middle/inner")
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(ComplexJson.class, "/complex")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/complex")
def coordinates = [
"mime.type": "application/json",
"request.method": "get"
]
def context = [ "schema.name": "complex" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("jane.doe", record.getAsString("username"))
Assert.assertEquals("testing7890", record.getAsString("password"))
Assert.assertEquals("jane.doe@test-example.com", record.getAsString("email"))
} finally {
server.shutdownServer()
}
}
@Test
void testOtherVerbs() {
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(VerbTest.class, "/simple")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, "/simple")
def validation = { String verb, boolean addBody, boolean addMimeType, boolean valid ->
def coordinates = [
"mime.type" : addMimeType ? "application/json" : null,
"request.method": verb
]
def context = [ "schema.name": "simple" ]
if (addBody) {
coordinates["request.body"] = prettyPrint(toJson([ msg: "Hello, world" ]))
}
try {
Optional<Record> response = lookupService.lookup(coordinates, context)
if (!valid) {
Assert.fail("Validation should fail.")
}
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("john.smith", record.getAsString("username"))
Assert.assertEquals("testing1234", record.getAsString("password"))
} catch (LookupFailureException e) {
if (valid) {
Assert.fail("Validation should be successful.")
}
}
}
// Delete does not require body nor mimeType.
validation("delete", false, false, true)
// Post and Put require body and mimeType.
["post", "put"].each { verb ->
validation(verb, false, false, false)
validation(verb, true, false, false)
validation(verb, true, true, true)
}
} finally {
server.shutdownServer()
}
}
@Test
void testTemplateMode() {
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
handler.addServletWithMapping(SimpleJson.class, "/simple/john.smith/friends/12345")
server.addHandler(handler)
try {
server.startServer()
setEndpoint(server.port, '/simple/${user.name}/friends/${friend.id}')
def coordinates = [
"mime.type": "application/json",
"request.method": "get",
"user.name": "john.smith",
"friend.id": 12345,
"endpoint.template": true
]
def context = [ "schema.name": "simple" ]
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())
def record = response.get()
Assert.assertEquals("john.smith", record.getAsString("username"))
Assert.assertEquals("testing1234", record.getAsString("password"))
} finally {
server.shutdownServer()
}
}
void setEndpoint(Integer serverPort, String endpoint) {
// Resolve environmental part of the URL via variable registry.
runner.setVariable("serverPort", String.valueOf(serverPort))
runner.setProperty(lookupService, RestLookupService.URL, "http://localhost:${serverPort}" + endpoint)
runner.enableControllerService(lookupService)
runner.assertValid()
}
}

View File

@ -64,35 +64,29 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId> <artifactId>log4j-over-slf4j</artifactId>
</dependency> </dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.squareup.okhttp3</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId> <artifactId>okhttp</artifactId>
<version>${okhttp.version}</version> <version>${okhttp.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version> <version>${jackson.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -17,26 +17,19 @@
package org.apache.nifi.oauth2; package org.apache.nifi.oauth2;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.SystemUtils; import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.TestServer;
import org.eclipse.jetty.server.Request;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.eclipse.jetty.server.handler.AbstractHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -44,38 +37,20 @@ import java.util.UUID;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class OAuth2TokenProviderImplTest { public class OAuth2TokenProviderImplTest {
private TestRunner runner; private TestRunner runner;
private static TestServer server;
private static String url; private MockWebServer mockWebServer;
private OAuth2TokenProvider oAuth2TokenProvider; private OAuth2TokenProvider oAuth2TokenProvider;
private static FakeOAuth2Server handler;
@BeforeClass
public static void beforeClass() throws Exception {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
// useful for verbose logging output
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
// create a Jetty server on a random port
server = new TestServer();
server.startServer();
// this is the base url with the random port
url = server.getUrl();
handler = new FakeOAuth2Server();
server.addHandler(handler);
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
mockWebServer = new MockWebServer();
final String url = mockWebServer.url("/").toString();
runner = TestRunners.newTestRunner(new AbstractProcessor() { runner = TestRunners.newTestRunner(new AbstractProcessor() {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -90,105 +65,61 @@ public class OAuth2TokenProviderImplTest {
} }
@Test @Test
public void testClientCredentialGrant() { public void testClientCredentialGrant() throws AccessTokenAcquisitionException, JsonProcessingException {
Exception ex = null; enqueueTokenResponse();
AccessToken token = null; final AccessToken token = oAuth2TokenProvider.getAccessTokenByClientCredentials(
try { "test-client",
token = oAuth2TokenProvider.getAccessTokenByClientCredentials( UUID.randomUUID().toString()
"test-client", );
UUID.randomUUID().toString() assertAccessTokenFound(token);
);
} catch (AccessTokenAcquisitionException e) {
ex = e;
} finally {
commonTest(ex, token);
}
} }
@Test @Test
public void testErrorHandler() { public void testErrorHandler() {
Exception ex = null; mockWebServer.enqueue(new MockResponse().setResponseCode(500));
assertThrows(AccessTokenAcquisitionException.class, () -> oAuth2TokenProvider.getAccessTokenByClientCredentials(
try { "test-client",
handler.setThrowException(true); UUID.randomUUID().toString()
oAuth2TokenProvider.getAccessTokenByClientCredentials( ));
"test-client",
UUID.randomUUID().toString()
);
} catch (AccessTokenAcquisitionException e) {
ex = e;
} finally {
handler.setThrowException(false);
assertTrue(ex instanceof AccessTokenAcquisitionException);
}
} }
@Test @Test
public void testPasswordGrant() { public void testPasswordGrant() throws AccessTokenAcquisitionException, JsonProcessingException {
Exception ex = null; enqueueTokenResponse();
AccessToken token = null; final AccessToken token = oAuth2TokenProvider.getAccessTokenByPassword(
try { "test-client",
token = oAuth2TokenProvider.getAccessTokenByPassword( UUID.randomUUID().toString(),
"test-client", "user",
UUID.randomUUID().toString(), "password"
"user", );
"password" assertAccessTokenFound(token);
);
} catch (AccessTokenAcquisitionException e) {
ex = e;
} finally {
commonTest(ex, token);
}
} }
@Test @Test
public void testRefreshToken() { public void testRefreshToken() throws AccessTokenAcquisitionException, JsonProcessingException {
Exception ex = null; enqueueTokenResponse();
AccessToken token = null; final AccessToken token = oAuth2TokenProvider.refreshToken(
try { new AccessToken("token", "refresh", "BEARER", 300, "test")
token = oAuth2TokenProvider.refreshToken( );
new AccessToken("token", "refresh", "BEARER", 300, "test") assertAccessTokenFound(token);
);
} catch (AccessTokenAcquisitionException e) {
ex = e;
} finally {
commonTest(ex, token);
}
} }
private void commonTest(Exception ex, AccessToken token) { private void assertAccessTokenFound(final AccessToken accessToken) {
assertNull(ex); assertNotNull(accessToken);
assertNotNull(token); assertEquals("access token", accessToken.getAccessToken());
assertEquals("access token", token.getAccessToken()); assertEquals(300, accessToken.getExpires().intValue());
assertEquals(300, token.getExpires().intValue()); assertEquals("BEARER", accessToken.getTokenType());
assertEquals("BEARER", token.getTokenType()); assertFalse(accessToken.isExpired());
assertFalse(token.isExpired());
} }
public static final class FakeOAuth2Server extends AbstractHandler { private void enqueueTokenResponse() throws JsonProcessingException {
boolean throwException = false; final Map<String, Object> token = new HashMap<>();
token.put("access_token", "access token");
public void setThrowException(boolean throwException) { token.put("refresh_token", "refresh token");
this.throwException = throwException; token.put("token_type", "BEARER");
} token.put("expires_in", 300);
token.put("scope", "test scope");
@Override final String accessToken = new ObjectMapper().writeValueAsString(token);
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { mockWebServer.enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody(accessToken));
baseRequest.setHandled(true);
if (throwException) {
response.setStatus(500);
} else {
Map<String, Object> token = new HashMap<>();
token.put("access_token", "access token");
token.put("refresh_token", "refresh token");
token.put("token_type", "BEARER");
token.put("expires_in", 300);
token.put("scope", "test scope");
response.setContentType("application/json");
response.getWriter().write(new ObjectMapper().writeValueAsString(token));
}
}
} }
} }