From ea2519e3ea5edba8b91b37e9d815bc711b3a997c Mon Sep 17 00:00:00 2001 From: Joey Frazee Date: Thu, 11 Jan 2018 14:00:42 -0700 Subject: [PATCH] NIFI-4748 - Add endpoint override to Kinesis processors Signed-off-by: Pierre Villard This closes #2399. --- .../processors/aws/AbstractAWSProcessor.java | 1 + .../kinesis/firehose/PutKinesisFirehose.java | 2 +- .../aws/kinesis/stream/PutKinesisStream.java | 2 +- ...utKinesisFirehoseWithEndpointOverride.java | 81 ++++++++++++++++++ ...TPutKinesisStreamWithEndpointOverride.java | 83 +++++++++++++++++++ 5 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehoseWithEndpointOverride.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStreamWithEndpointOverride.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 5d244c0848..2e271cf078 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -224,6 +224,7 @@ public abstract class AbstractAWSProcessor properties = Collections.unmodifiableList( Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, - PROXY_HOST,PROXY_HOST_PORT)); + PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); /** * Max buffer size 1 MB diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java index cafc82cfa2..7694fd4441 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -79,7 +79,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { public static final List properties = Collections.unmodifiableList( Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, - AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT)); + AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE)); /** A random number generator for cases where partition key is not available */ protected Random randomParitionKeyGenerator = new Random(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehoseWithEndpointOverride.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehoseWithEndpointOverride.java new file mode 100644 index 0000000000..f12b44ea42 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehoseWithEndpointOverride.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +// This integration test can be run against a mock Kenesis Firehose such as +// https://github.com/localstack/localstack +public class ITPutKinesisFirehoseWithEndpointOverride { + + private TestRunner runner; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutKinesisFirehose.class); + runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "access key"); + runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "test"); + runner.setProperty(PutKinesisFirehose.ENDPOINT_OVERRIDE, "http://localhost:4573"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + + System.clearProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY); + } + + @Test + public void testIntegrationSuccess() throws Exception { + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationFailedBadStreamName() throws Exception { + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "notfound"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_FAILURE, 1); + + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStreamWithEndpointOverride.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStreamWithEndpointOverride.java new file mode 100644 index 0000000000..62830686ad --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStreamWithEndpointOverride.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY; + +import java.util.List; + +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +// This integration test can be run against a mock Kenesis such as +// https://github.com/mhart/kinesalite or https://github.com/localstack/localstack +public class ITPutKinesisStreamWithEndpointOverride { + + private TestRunner runner; + + @Before + public void setUp() throws Exception { + System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + + runner = TestRunners.newTestRunner(PutKinesisStream.class); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "test"); + runner.setProperty(PutKinesisStream.ACCESS_KEY, "access key"); + runner.setProperty(PutKinesisStream.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisStream.ENDPOINT_OVERRIDE, "http://localhost:4568"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + + System.clearProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY); + } + + @Test + public void testIntegrationSuccess() throws Exception { + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationFailedBadStreamName() throws Exception { + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "notfound"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1); + + } + +}