NIFI-9520 update AWS SDK and Kinesis Client Library versions; Allow PutS3Object to use all available StorageClasses

This closes #5632.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Chris Sampson 2022-01-05 12:32:23 +00:00 committed by Peter Turcsanyi
parent dc42012755
commit bd1b2b4d69
4 changed files with 306 additions and 362 deletions

View File

@ -35,10 +35,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.Tag;
@ -140,6 +143,10 @@ public class PutS3Object extends AbstractS3Processor {
public static final String CONTENT_DISPOSITION_INLINE = "inline";
public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment";
private static final Set<String> STORAGE_CLASSES = Collections.unmodifiableSortedSet(new TreeSet<>(
Arrays.stream(StorageClass.values()).map(StorageClass::name).collect(Collectors.toSet())
));
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
@ -183,9 +190,7 @@ public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.IntelligentTiering.name(), StorageClass.StandardInfrequentAccess.name(),
StorageClass.OneZoneInfrequentAccess.name(), StorageClass.Glacier.name(), StorageClass.DeepArchive.name(),
StorageClass.ReducedRedundancy.name(), StorageClass.Outposts.name())
.allowableValues(STORAGE_CLASSES)
.defaultValue(StorageClass.Standard.name())
.build();

View File

@ -18,21 +18,20 @@
package org.apache.nifi.processors.aws.wag;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -69,7 +68,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
public void setupCredFile() {
runner.setProperty(AbstractAWSProcessor.CREDENTIALS_FILE,
runner.setProperty(AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE,
"src/test/resources/mock-aws-credentials.properties");
}
@ -107,7 +106,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
@ -115,21 +114,8 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// expected in response of each message
// status code, status message, all headers from server response --> ff attributes
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "application/json");
final MockFlowFile bundle2 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(1);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "application/json");
assert200Response(runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0), true);
assert200Response(runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(1), false);
final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertEquals(3, provEvents.size());
@ -151,6 +137,20 @@ public abstract class TestInvokeAWSGatewayApiCommon {
assertTrue(recieveEvent);
}
private void assert200Response(final MockFlowFile bundle, final boolean requestWithInput) throws IOException {
bundle.assertContentEquals("{\"status\":\"200\"}".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Content-Type", "application/json");
// check any input FlowFile attributes were included in the Response FlowFile
if (requestWithInput) {
bundle.assertAttributeEquals("Foo", "Bar");
} else {
bundle.assertAttributeNotExists("Foo");
}
}
@Test
public void testOutputResponseRegardless() throws Exception {
addHandler(new GetOrHeadHandler(true));
@ -175,7 +175,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_NO_RETRY_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "404");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "Not Found");
bundle.assertAttributeEquals("Foo", "Bar");
@ -185,7 +185,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{ \"error\": \"oops\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{ \"error\": \"oops\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "404");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "Not Found");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -217,7 +217,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_NO_RETRY_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "404");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "Not Found");
bundle.assertAttributeEquals("outputBody", "{ \"error\": \"oops\"}");
@ -228,7 +228,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{ \"error\": \"oops\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{ \"error\": \"oops\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "404");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "Not Found");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -239,8 +239,6 @@ public abstract class TestInvokeAWSGatewayApiCommon {
public void testOutputResponseSetMimeTypeToResponseContentType() throws Exception {
addHandler(new GetOrHeadHandler());
String statusUrl = "/status/200";
setupEndpointAndRegion();
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/status/200");
@ -263,7 +261,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("outputBody", "{\"status\":\"200\"}");
@ -274,7 +272,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -285,7 +283,6 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Test
public void testOutputResponseRegardlessWithOutputInAttributeLarge() throws Exception {
addHandler(new GetLargeHandler(true));
String statusUrl = "/status/200";
setupEndpointAndRegion();
@ -310,7 +307,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_NO_RETRY_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "404");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "Not Found");
bundle.assertAttributeEquals("outputBody", "{\"name\":\"Lo");
@ -356,7 +353,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
@ -366,7 +363,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
bundle1.assertContentEquals("/status/200".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -401,7 +398,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+all attributes from response)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
@ -413,7 +410,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
bundle1.assertContentEquals("/status/200".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -445,7 +442,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals("outputBody", "{\"status\":\"200\"}");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
@ -476,7 +473,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Content-Type", "application/json");
@ -507,7 +504,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Content-Type", "application/json");
@ -537,14 +534,14 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// status code, status message, no ff content
// server response message body into attribute of ff
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals("outputBody", "{\"status\":\"200\"}");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
}
@Test
public void test500() throws Exception {
public void test500() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -576,7 +573,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void test300() throws Exception {
public void test300() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -607,7 +604,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void test304() throws Exception {
public void test304() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -639,7 +636,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void test400() throws Exception {
public void test400() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -670,7 +667,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void test400WithPenalizeNoRetry() throws Exception {
public void test400WithPenalizeNoRetry() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -701,7 +698,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void test412() throws Exception {
public void test412() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -753,14 +750,14 @@ public abstract class TestInvokeAWSGatewayApiCommon {
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -790,14 +787,14 @@ public abstract class TestInvokeAWSGatewayApiCommon {
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -809,7 +806,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPostWithMimeType() throws Exception {
public void testPostWithMimeType() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
@ -829,7 +826,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPostWithEmptyELExpression() throws Exception {
public void testPostWithEmptyELExpression() {
addHandler(new MutativeMethodHandler(MutativeMethod.POST,
InvokeAWSGatewayApi.DEFAULT_CONTENT_TYPE));
@ -848,7 +845,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPostWithContentTypeProperty() throws Exception {
public void testPostWithContentTypeProperty() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
@ -868,7 +865,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPostWithEmptyBodySet() throws Exception {
public void testPostWithEmptyBodySet() {
final String suppliedMimeType = "";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
@ -888,9 +885,8 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE_NAME, 1);
}
@Test
public void testPutWithMimeType() throws Exception {
public void testPutWithMimeType() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
@ -910,7 +906,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPutWithEmptyELExpression() throws Exception {
public void testPutWithEmptyELExpression() {
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, InvokeAWSGatewayApi.DEFAULT_CONTENT_TYPE));
setupEndpointAndRegion();
@ -927,7 +923,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPutWithContentTypeProperty() throws Exception {
public void testPutWithContentTypeProperty() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
setupEndpointAndRegion();
@ -965,13 +961,13 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -1002,13 +998,13 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -1020,7 +1016,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPatchWithMimeType() throws Exception {
public void testPatchWithMimeType() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, suppliedMimeType));
@ -1040,7 +1036,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPatchWithEmptyELExpression() throws Exception {
public void testPatchWithEmptyELExpression() {
addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, InvokeAWSGatewayApi.DEFAULT_CONTENT_TYPE));
setupEndpointAndRegion();
@ -1058,7 +1054,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testPatchWithContentTypeProperty() throws Exception {
public void testPatchWithContentTypeProperty() {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, suppliedMimeType));
@ -1096,13 +1092,13 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
bundle1.assertContentEquals("".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -1130,13 +1126,13 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE).get(0);
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes("UTF-8"));
bundle1.assertContentEquals("{\"status\":\"200\"}".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
@ -1167,7 +1163,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
//original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ_NAME).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
@ -1177,7 +1173,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
//server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE_NAME).get(0);
bundle1.assertContentEquals("Bar".getBytes("UTF-8"));
bundle1.assertContentEquals("Bar".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("dynamicHeader", "yes!");
@ -1186,7 +1182,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testReadTimeout() throws Exception {
public void testReadTimeout() {
addHandler(new ReadTimeoutHandler());
setupEndpointAndRegion();
@ -1215,7 +1211,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testConnectFailBadPort() throws Exception {
public void testConnectFailBadPort() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -1245,7 +1241,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test
public void testConnectFailBadHost() throws Exception {
public void testConnectFailBadHost() {
addHandler(new GetOrHeadHandler());
setupEndpointAndRegion();
@ -1274,8 +1270,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
@Test(expected = java.lang.AssertionError.class)
public void testArbitraryRequestFailsValidation() throws Exception {
public void testArbitraryRequestFailsValidation() {
setupEndpointAndRegion();
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/status/200");
@ -1299,24 +1294,21 @@ public abstract class TestInvokeAWSGatewayApiCommon {
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, "http://nifi.apache.org/");
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/status/200");
runner.setProperty(InvokeAWSGatewayApi.PROXY_HOST, "${proxy.host}");
try {
runner.run();
Assert.fail();
} catch (AssertionError e) {
// Expect assertion error when proxy port isn't set but host is.
}
runner.setProperty(InvokeAWSGatewayApi.PROXY_HOST, "${proxy.host}");
final AssertionError aePort = assertThrows(AssertionError.class, () -> runner.run());
assertEquals("Processor has 1 validation failures:\n" +
"'Proxy Host and Port' is invalid because If Proxy Host or Proxy Port is set, both must be set\n",
aePort.getMessage());
runner.setProperty(InvokeAWSGatewayApi.PROXY_HOST_PORT, "${proxy.port}");
// Expect assertion error when proxy password isn't set but username is.
runner.setProperty(InvokeAWSGatewayApi.PROXY_USERNAME, "${proxy.username}");
try {
runner.run();
Assert.fail();
} catch (AssertionError e) {
// Expect assertion error when proxy password isn't set but host is.
}
final AssertionError aePassword = assertThrows(AssertionError.class, () -> runner.run());
assertEquals("Processor has 1 validation failures:\n" +
"'Proxy User and Password' is invalid because If Proxy Username or Proxy Password is set, both must be set\n",
aePassword.getMessage());
runner.setProperty(InvokeAWSGatewayApi.PROXY_PASSWORD, "${proxy.password}");
createFlowFiles(runner);
@ -1334,7 +1326,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
//original flow file (+attributes)
final MockFlowFile bundle = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
@ -1344,62 +1336,26 @@ public abstract class TestInvokeAWSGatewayApiCommon {
//server response message body into payload of ff
final MockFlowFile bundle1 = runner
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE).get(0);
bundle1.assertContentEquals("http://nifi.apache.org/status/200".getBytes("UTF-8"));
bundle1.assertContentEquals("http://nifi.apache.org/status/200".getBytes(StandardCharsets.UTF_8));
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
bundle1.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
}
/*
@Test
public void testOnPropertyModified() throws Exception {
final InvokeHTTP processor = new InvokeHTTP();
final Field regexAttributesToSendField = InvokeHTTP.class.getDeclaredField("regexAttributesToSend");
regexAttributesToSendField.setAccessible(true);
assertNull(regexAttributesToSendField.get(processor));
// Set Attributes to Send.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
assertNotNull(regexAttributesToSendField.get(processor));
// Null clear Attributes to Send. NIFI-1125: Throws NullPointerException.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", null);
assertNull(regexAttributesToSendField.get(processor));
// Set Attributes to Send.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
assertNotNull(regexAttributesToSendField.get(processor));
// Clear Attributes to Send with empty string.
processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", "");
assertNull(regexAttributesToSendField.get(processor));
}
*/
public static void createFlowFiles(final TestRunner testRunner)
throws UnsupportedEncodingException {
public static void createFlowFiles(final TestRunner testRunner) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("Foo", "Bar");
testRunner.enqueue("Hello".getBytes("UTF-8"), attributes);
testRunner.enqueue("Hello".getBytes(StandardCharsets.UTF_8), attributes);
}
protected static class DateHandler extends AbstractHandler {
private String dateString;
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
dateString = request.getHeader("Date");
response.setStatus(200);
response.setContentType("text/plain");
response.getWriter().println("Way to go!");
@ -1413,8 +1369,6 @@ public abstract class TestInvokeAWSGatewayApiCommon {
private final MutativeMethod method;
private final String expectedContentType;
private String headerToTrack;
private String trackedHeaderValue;
public MutativeMethodHandler(final MutativeMethod method) {
this(method, "application/plain-text");
@ -1426,31 +1380,21 @@ public abstract class TestInvokeAWSGatewayApiCommon {
this.expectedContentType = expectedContentType;
}
private void setHeaderToTrack(String headerToTrack) {
this.headerToTrack = headerToTrack;
}
public String getTrackedHeaderValue() {
return trackedHeaderValue;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if (method.name().equals(request.getMethod())) {
if (this.expectedContentType.isEmpty()) {
// with nothing set, aws defaults to form encoded
Assert.assertEquals(request.getHeader("Content-Type"),
"application/x-www-form-urlencoded; charset=UTF-8");
// with nothing set, aws defaults to no Content-Type
Assert.assertNull("Content-Type", request.getHeader("Content-Type"));
} else {
assertEquals(this.expectedContentType, request.getHeader("Content-Type"));
}
final String body = request.getReader().readLine();
this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack);
if (this.expectedContentType.isEmpty()) {
Assert.assertNull(body);
@ -1462,9 +1406,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
response.setContentType("text/plain");
response.setContentLength(0);
}
}
}
public static class GetOrHeadHandler extends AbstractHandler {
@ -1480,10 +1422,10 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
if (!force404 && "GET".equalsIgnoreCase(request.getMethod())) {
@ -1495,9 +1437,10 @@ public abstract class TestInvokeAWSGatewayApiCommon {
// this will be treated as an error
// the target content must be json
target = "{\"status\":\"moved\"}";
} else {
target = String.format("{\"status\":\"%d\"}", status);
}
response.setContentType("application/json");
target = String.format("{\"status\":\"%d\"}", status);
response.setContentLength(target.length());
response.setHeader("Cache-Control", "public,max-age=1");
@ -1516,16 +1459,12 @@ public abstract class TestInvokeAWSGatewayApiCommon {
writer.flush();
}
}
}
}
public static class GetLargeHandler extends AbstractHandler {
private boolean force404 = false;
public GetLargeHandler() {
}
private final boolean force404;
public GetLargeHandler(boolean force404) {
this.force404 = force404;
@ -1533,10 +1472,10 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentType("text/plain");
@ -1574,10 +1513,10 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentType("text/plain");
@ -1596,7 +1535,6 @@ public abstract class TestInvokeAWSGatewayApiCommon {
response.setContentType("text/plain");
response.setContentLength(0);
}
}
}
@ -1604,18 +1542,17 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) {
baseRequest.setHandled(true);
if ("DELETE".equalsIgnoreCase(request.getMethod())) {
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentLength(0);
} else {
response.setStatus(404);
response.setContentType("text/plain");
response.setContentLength(0);
}
response.setContentLength(0);
}
}
@ -1623,11 +1560,11 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("OPTIONS".equalsIgnoreCase(request.getMethod())) {
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentType("application/json");
target = String.format("{\"status\":\"%d\"}", status);
@ -1649,13 +1586,13 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("Get".equalsIgnoreCase(request.getMethod())) {
String headerValue = request.getHeader("Foo");
response.setHeader("dynamicHeader", request.getHeader("dynamicHeader"));
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentLength(headerValue.length());
response.setContentType("text/plain");
@ -1676,7 +1613,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("Get".equalsIgnoreCase(request.getMethod())) {
@ -1687,7 +1624,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
String headerValue = request.getHeader("Foo");
headerValue = headerValue == null ? "" : headerValue;
final int status = Integer.valueOf(target.substring("/status".length() + 1));
final int status = Integer.parseInt(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentLength(headerValue.length());
response.setContentType("text/plain");
@ -1708,7 +1645,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
HttpServletResponse response) throws IOException {
baseRequest.setHandled(true);
if ("Get".equalsIgnoreCase(request.getMethod())) {

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.aws.wag;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.http.AmazonHttpClient;
import com.amazonaws.http.apache.client.impl.SdkHttpClient;
import com.amazonaws.internal.TokenBucket;
import com.amazonaws.metrics.RequestMetricCollector;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.client.methods.HttpUriRequest;
@ -36,6 +38,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -47,7 +50,6 @@ import static org.mockito.Mockito.times;
public class TestInvokeAmazonGatewayApiMock {
private TestRunner runner = null;
private InvokeAWSGatewayApi mockGetApi = null;
private SdkHttpClient mockSdkClient = null;
@Before
@ -55,8 +57,8 @@ public class TestInvokeAmazonGatewayApiMock {
mockSdkClient = Mockito.mock(SdkHttpClient.class);
ClientConfiguration clientConfig = new ClientConfiguration();
mockGetApi = new InvokeAWSGatewayApi(
new AmazonHttpClient(clientConfig, mockSdkClient, null));
InvokeAWSGatewayApi mockGetApi = new InvokeAWSGatewayApi(
new AmazonHttpClient(clientConfig, mockSdkClient, RequestMetricCollector.NONE, new TokenBucket()));
runner = TestRunners.newTestRunner(mockGetApi);
runner.setValidateExpressionUsage(false);
@ -133,7 +135,7 @@ public class TestInvokeAmazonGatewayApiMock {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("Foo", "Bar");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue("Hello".getBytes(StandardCharsets.UTF_8), attributes);
// execute
runner.assertValid();
runner.run(1);
@ -184,7 +186,7 @@ public class TestInvokeAmazonGatewayApiMock {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("Foo", "Bar");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue("Hello".getBytes(StandardCharsets.UTF_8), attributes);
// execute
runner.assertValid();
runner.run(1);

View File

@ -27,9 +27,9 @@
<properties>
<!-- keep AWS 1.x until NIFI-8287 -->
<aws-java-sdk-version>1.11.1016</aws-java-sdk-version>
<!-- Do not upgrade to 1.14.x+ until https://github.com/awslabs/amazon-kinesis-client/issues/796 is fixed -->
<aws-kinesis-client-library-version>1.13.3</aws-kinesis-client-library-version>
<aws-java-sdk-version>1.12.133</aws-java-sdk-version>
<!-- keep KCL 1.x until NIFI-8531 (blocked by NIFI-8287) -->
<aws-kinesis-client-library-version>1.14.7</aws-kinesis-client-library-version>
</properties>
<dependencyManagement>