NIFI-2892 Implement AWS Kinesis Stream Consume Processor

This closes #4822.

Co-authored-by: uday <udaygkale@gmail.com>

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Chris Sampson 2021-05-12 20:03:28 +02:00 committed by Peter Turcsanyi
parent ab8b7444b5
commit a274c12bbb
29 changed files with 3221 additions and 161 deletions

View File

@ -223,36 +223,40 @@ public class StandardProcessorTestRunner implements TestRunner {
if (++finishedCount == 1) {
unscheduledRun = true;
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
}
unSchedule();
}
} catch (final Exception e) {
}
}
if (!unscheduledRun) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
}
unSchedule();
}
if (stopOnFinish) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor, context);
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e);
}
stop();
}
} finally {
context.disableExpressionValidation();
}
}
public void unSchedule() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context);
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e);
}
}
public void stop() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor, context);
} catch (final Exception e) {
Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e);
}
}
@Override
public void shutdown() {
try {

View File

@ -171,6 +171,20 @@ public interface TestRunner {
*/
void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait);
/**
* Invokes all methods on the Processor that are annotated with the
* {@link org.apache.nifi.annotation.lifecycle.OnUnscheduled @OnUnscheduled} annotation. If
* any of these methods throws an Exception, the Unit Test will fail
*/
void unSchedule();
/**
* Invokes all methods on the Processor that are annotated with the
* {@link org.apache.nifi.annotation.lifecycle.OnStopped @OnStopped} annotation. If
* any of these methods throws an Exception, the Unit Test will fail
*/
void stop();
/**
* Invokes all methods on the Processor that are annotated with the
* {@link org.apache.nifi.annotation.lifecycle.OnShutdown @OnShutdown} annotation. If

View File

@ -47,6 +47,11 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws-kinesis-client-library-version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>

View File

@ -70,8 +70,7 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
* @param context the process context
*/
protected void onScheduledUsingControllerService(ProcessContext context) {
final ClientType awsClient = createClient(context, getCredentialsProvider(context), createConfiguration(context));
this.client = awsClient;
this.client = createClient(context, getCredentialsProvider(context), createConfiguration(context));
super.initializeRegionAndEndpoint(context);
}

View File

@ -50,8 +50,10 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@ -68,7 +70,7 @@ import org.apache.nifi.ssl.SSLContextService;
*
*/
@Deprecated
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractSessionFactoryProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to success relationship").build();
@ -151,6 +153,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
protected volatile ClientType client;
protected volatile Region region;
private static final Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$");
// If protocol is changed to be a property, ensure other uses are also changed
protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
protected static final String DEFAULT_USER_AGENT = "NiFi";
@ -167,7 +171,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
for (final Regions region : Regions.values()) {
values.add(createAllowableValue(region));
}
return values.toArray(new AllowableValue[values.size()]);
return values.toArray(new AllowableValue[0]);
}
@Override
@ -265,18 +269,41 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
@OnScheduled
public void onScheduled(final ProcessContext context) {
final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
this.client = awsClient;
this.client = createClient(context, getCredentials(context), createConfiguration(context));
initializeRegionAndEndpoint(context);
}
/*
* Allow optional override of onTrigger with the ProcessSessionFactory where required for AWS processors (e.g. ConsumeKinesisStream)
*
* @see AbstractProcessor
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
session.rollback(true);
throw t;
}
}
/*
* Default to requiring the "standard" onTrigger with a single ProcessSession
*/
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected void initializeRegionAndEndpoint(ProcessContext context) {
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String region = context.getProperty(REGION).getValue();
if (region != null) {
this.region = Region.getRegion(Regions.fromName(region));
client.setRegion(this.region);
if (client != null) {
client.setRegion(this.region);
}
} else {
this.region = null;
}
@ -284,11 +311,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
// if the endpoint override has been configured, set the endpoint.
// (per Amazon docs this should only be configured at client creation)
if (getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
if (client != null && getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
if (!urlstr.isEmpty()) {
getLogger().info("Overriding endpoint with {}", new Object[]{urlstr});
getLogger().info("Overriding endpoint with {}", urlstr);
if (urlstr.endsWith(".vpce.amazonaws.com")) {
String region = parseRegionForVPCE(urlstr);
@ -312,7 +339,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
private String parseRegionForVPCE(String url) {
int index = url.length() - ".vpce.amazonaws.com".length();
Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$");
Matcher matcher = VPCE_ENDPOINT_PATTERN.matcher(url.substring(0, index));
if (matcher.matches()) {
@ -362,7 +388,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
return new AnonymousAWSCredentials();
}
@OnShutdown

View File

@ -19,10 +19,8 @@ package org.apache.nifi.processors.aws.kinesis;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import com.amazonaws.AmazonWebServiceClient;
@ -32,66 +30,35 @@ import com.amazonaws.AmazonWebServiceClient;
*/
public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
extends AbstractAWSCredentialsProviderProcessor<ClientType> {
/**
* Kinesis put record response error message
*/
public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Message Batch Size")
.name("message-batch-size")
.description("Batch size for messages (1-500).")
.defaultValue("250")
.required(false)
.addValidator(StandardValidators.createLongValidator(1, 500, true))
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
.name("max-message-buffer-size")
.displayName("Max message buffer size (MB)")
.description("Max message buffer size in Mega-bytes")
.defaultValue("1 MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.sensitive(false)
.build();
/**
* Max buffer size 1 MB
*/
public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
String message) {
flowFileCandidate = session.putAttribute(flowFileCandidate, message,
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
session.transfer(flowFileCandidate, REL_FAILURE);
private void handleFlowFileTooBig(final ProcessSession session, final FlowFile flowFileCandidate, final String message) {
final FlowFile tooBig = session.putAttribute(flowFileCandidate, message,
"record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE);
session.transfer(tooBig, REL_FAILURE);
getLogger().error("Failed to publish to kinesis records {} because the size was greater than {} bytes",
new Object[]{flowFileCandidate, MAX_MESSAGE_SIZE});
return flowFileCandidate;
tooBig, MAX_MESSAGE_SIZE);
}
protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, String message) {
List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
final List<FlowFile> flowFiles = new ArrayList<>(batchSize);
long currentBufferSizeBytes = 0;
for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
final FlowFile flowFileCandidate = session.get();
if (flowFileCandidate != null) {
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
handleFlowFileTooBig(session, flowFileCandidate, message);
continue;
}
FlowFile flowFileCandidate = session.get();
if ( flowFileCandidate == null )
break;
currentBufferSizeBytes += flowFileCandidate.getSize();
if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, message);
continue;
flowFiles.add(flowFileCandidate);
}
currentBufferSizeBytes += flowFileCandidate.getSize();
flowFiles.add(flowFileCandidate);
}
return flowFiles;
}

View File

@ -16,27 +16,24 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
/**
* This class provides processor the base class for kinesis client
*/
public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisClient> {
public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
.name("kinesis-stream-name")
.displayName("Amazon Kinesis Stream Name")
.description("The name of Kinesis Stream")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -45,6 +42,7 @@ public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesis
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
@SuppressWarnings("deprecated")
protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider");
@ -60,7 +58,6 @@ public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesis
protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials");
return new AmazonKinesisClient(credentials, config);
return createClient(context, new AWSStaticCredentialsProvider(credentials), config);
}
}
}

View File

@ -56,6 +56,14 @@ The following binary components are provided under the Apache Software License v
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons BeanUtils
The following NOTICE information applies:
Apache Commons BeanUtils
Copyright 2000-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Amazon Web Services SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

View File

@ -45,6 +45,23 @@
<version>1.14.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatchmetrics</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -64,21 +81,21 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatchmetrics</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.14.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,728 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean2;
import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
import org.apache.commons.beanutils.PropertyUtilsBean;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
import org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRaw;
import org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordFieldType;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"amazon", "aws", "kinesis", "consume", "stream"})
@CapabilityDescription("Reads data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) " +
" or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. " +
"At-least-once delivery of all Kinesis Records within the Stream while the processor is running. " +
"AWS Kinesis Client Library can take several seconds to initialise before starting to fetch data. " +
"Uses DynamoDB for check pointing and CloudWatch (optional) for metrics. " +
"Ensure that the credentials provided have access to DynamoDB and CloudWatch (optional) along with Kinesis.")
@WritesAttributes({
@WritesAttribute(attribute = AbstractKinesisRecordProcessor.AWS_KINESIS_PARTITION_KEY,
description = "Partition key of the (last) Kinesis Record read from the Shard"),
@WritesAttribute(attribute = AbstractKinesisRecordProcessor.AWS_KINESIS_SHARD_ID,
description = "Shard ID from which the Kinesis Record was read"),
@WritesAttribute(attribute = AbstractKinesisRecordProcessor.AWS_KINESIS_SEQUENCE_NUMBER,
description = "The unique identifier of the (last) Kinesis Record within its Shard"),
@WritesAttribute(attribute = AbstractKinesisRecordProcessor.AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP,
description = "Approximate arrival timestamp of the (last) Kinesis Record read from the stream"),
@WritesAttribute(attribute = "mime.type",
description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)"),
@WritesAttribute(attribute = "record.count",
description = "Number of records written to the FlowFiles by the Record Writer (if configured)"),
@WritesAttribute(attribute = "record.error.message",
description = "This attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured)")
})
@DynamicProperties({
@DynamicProperty(name="Kinesis Client Library (KCL) Configuration property name",
description="Override default KCL Configuration properties with required values. Supports setting of values via the \"with\" " +
"methods on the KCL Configuration class. Specify the property to be set without the leading prefix, e.g. \"maxInitialisationAttempts\" " +
"will call \"withMaxInitialisationAttempts\" and set the provided value. Only supports setting of simple property values, e.g. String, " +
"int, long and boolean. Does not allow override of KCL Configuration settings handled by non-dynamic processor properties.",
expressionLanguageScope = ExpressionLanguageScope.NONE, value="Value to set in the KCL Configuration property")
})
@SystemResourceConsideration(resource = SystemResource.CPU, description = "Kinesis Client Library is used to create a Worker thread for consumption of Kinesis Records. " +
"The Worker is initialised and started when this Processor has been triggered. It runs continually, spawning Kinesis Record Processors as required " +
"to fetch Kinesis Records. The Worker Thread (and any child Record Processor threads) are not controlled by the normal NiFi scheduler as part of the " +
"Concurrent Thread pool and are not released until this processor is stopped.")
@SystemResourceConsideration(resource = SystemResource.NETWORK, description = "Kinesis Client Library will continually poll for new Records, " +
"requesting up to a maximum number of Records/bytes per call. This can result in sustained network usage.")
@SeeAlso(PutKinesisStream.class)
public class ConsumeKinesisStream extends AbstractKinesisStreamProcessor {
static final AllowableValue TRIM_HORIZON = new AllowableValue(
InitialPositionInStream.TRIM_HORIZON.toString(),
InitialPositionInStream.TRIM_HORIZON.toString(),
"Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard."
);
static final AllowableValue LATEST = new AllowableValue(
InitialPositionInStream.LATEST.toString(),
InitialPositionInStream.LATEST.toString(),
"Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard."
);
static final AllowableValue AT_TIMESTAMP = new AllowableValue(
InitialPositionInStream.AT_TIMESTAMP.toString(),
InitialPositionInStream.AT_TIMESTAMP.toString(), "Start reading from the position denoted by a specific time stamp, provided in the value Timestamp."
);
public static final PropertyDescriptor APPLICATION_NAME = new PropertyDescriptor.Builder()
.displayName("Application Name")
.name("amazon-kinesis-stream-application-name")
.description("The Kinesis stream reader application name.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true).build();
public static final PropertyDescriptor INITIAL_STREAM_POSITION = new PropertyDescriptor.Builder()
.displayName("Initial Stream Position")
.name("amazon-kinesis-stream-initial-position")
.description("Initial position to read Kinesis streams.")
.allowableValues(LATEST, TRIM_HORIZON, AT_TIMESTAMP)
.defaultValue(LATEST.getValue())
.required(true).build();
public static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new PropertyDescriptor.Builder()
.displayName("Stream Position Timestamp")
.name("amazon-kinesis-stream-position-timestamp")
.description("Timestamp position in stream from which to start reading Kinesis Records. " +
"Required if " + INITIAL_STREAM_POSITION.getDescription() + " is " + AT_TIMESTAMP.getDisplayName() + ". " +
"Uses the Timestamp Format to parse value into a Date.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) // customValidate checks the value against TIMESTAMP_FORMAT
.dependsOn(INITIAL_STREAM_POSITION, AT_TIMESTAMP)
.required(false).build();
public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.displayName("Timestamp Format")
.name("amazon-kinesis-stream-timestamp-format")
.description("Format to use for parsing the " + STREAM_POSITION_TIMESTAMP.getDisplayName() + " into a Date " +
"and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.")
.addValidator((subject, input, context) -> {
if (StringUtils.isNotBlank(input)) {
try {
DateTimeFormatter.ofPattern(input);
} catch (Exception e) {
return new ValidationResult.Builder().valid(false).subject(subject).input(input)
.explanation("Must be a valid java.time.DateTimeFormatter pattern, e.g. " + RecordFieldType.TIMESTAMP.getDefaultFormat())
.build();
}
}
return new ValidationResult.Builder().valid(true).subject(subject).build();
})
.defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true).build();
public static final PropertyDescriptor FAILOVER_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Failover Timeout")
.name("amazon-kinesis-stream-failover-timeout")
.description("Kinesis Client Library failover timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.required(true).build();
public static final PropertyDescriptor GRACEFUL_SHUTDOWN_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Graceful Shutdown Timeout")
.name("amazon-kinesis-stream-graceful-shutdown-timeout")
.description("Kinesis Client Library graceful shutdown timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("20 secs")
.required(true).build();
public static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
.displayName("Checkpoint Interval")
.name("amazon-kinesis-stream-checkpoint-interval")
.description("Interval between Kinesis checkpoints")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 secs")
.required(true).build();
public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
.displayName("Retry Count")
.name("amazon-kinesis-stream-retry-count")
.description("Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("10")
.required(true).build();
public static final PropertyDescriptor RETRY_WAIT = new PropertyDescriptor.Builder()
.displayName("Retry Wait")
.name("amazon-kinesis-stream-retry-wait")
.description("Interval between Kinesis operation retries (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 sec")
.required(true).build();
public static final PropertyDescriptor DYNAMODB_ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
.displayName("DynamoDB Override")
.name("amazon-kinesis-stream-dynamodb-override")
.description("DynamoDB override to use non-AWS deployments")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false).build();
public static final PropertyDescriptor REPORT_CLOUDWATCH_METRICS = new PropertyDescriptor.Builder()
.displayName("Report Metrics to CloudWatch")
.name("amazon-kinesis-stream-cloudwatch-flag")
.description("Whether to report Kinesis usage metrics to CloudWatch.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true).build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for reading received messages." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
" If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-writer")
.displayName("Record Writer")
.description("The Record Writer to use for serializing Records to an output FlowFile." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
" If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE)
.required(true)
.build();
public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message from Kinesis cannot be parsed using the configured Record Reader" +
" or failed to be written by the configured Record Writer," +
" the contents of the message will be routed to this Relationship as its own individual FlowFile.")
.build();
public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
Arrays.asList(
// Kinesis Stream specific properties
KINESIS_STREAM_NAME, APPLICATION_NAME, RECORD_READER, RECORD_WRITER, REGION, ENDPOINT_OVERRIDE,
DYNAMODB_ENDPOINT_OVERRIDE, INITIAL_STREAM_POSITION, STREAM_POSITION_TIMESTAMP, TIMESTAMP_FORMAT,
FAILOVER_TIMEOUT, GRACEFUL_SHUTDOWN_TIMEOUT, CHECKPOINT_INTERVAL, NUM_RETRIES, RETRY_WAIT, REPORT_CLOUDWATCH_METRICS,
// generic AWS processor properties
TIMEOUT, AWS_CREDENTIALS_PROVIDER_SERVICE, PROXY_CONFIGURATION_SERVICE
)
);
private static final Map<String, PropertyDescriptor> DISALLOWED_DYNAMIC_KCL_PROPERTIES = new HashMap<String, PropertyDescriptor>(){{
put("regionName", REGION);
put("timestampAtInitialPositionInStream", STREAM_POSITION_TIMESTAMP);
put("initialPositionInStream", INITIAL_STREAM_POSITION);
put("dynamoDBEndpoint", DYNAMODB_ENDPOINT_OVERRIDE);
put("kinesisEndpoint", ENDPOINT_OVERRIDE);
put("failoverTimeMillis", FAILOVER_TIMEOUT);
put("gracefulShutdownMillis", GRACEFUL_SHUTDOWN_TIMEOUT);
}};
private static final Object WORKER_LOCK = new Object();
private static final String WORKER_THREAD_NAME_TEMPLATE = ConsumeKinesisStream.class.getSimpleName() + "-" + Worker.class.getSimpleName() + "-";
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
private static final Set<Relationship> RECORD_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_PARSE_FAILURE)));
private static final PropertyUtilsBean PROPERTY_UTILS_BEAN;
private static final BeanUtilsBean BEAN_UTILS_BEAN;
static {
PROPERTY_UTILS_BEAN = new PropertyUtilsBean();
PROPERTY_UTILS_BEAN.addBeanIntrospector(new FluentPropertyBeanIntrospector("with"));
final ConvertUtilsBean2 convertUtilsBean2 = new ConvertUtilsBean2() {
@SuppressWarnings("unchecked") // generic Enum conversion from String property values
@Override
public Object convert(final String value, final Class clazz) {
if (clazz.isEnum()) {
return Enum.valueOf(clazz, value);
}else{
return super.convert(value, clazz);
}
}
};
BEAN_UTILS_BEAN = new BeanUtilsBean(convertUtilsBean2, PROPERTY_UTILS_BEAN);
}
private volatile boolean isRecordReaderSet;
private volatile boolean isRecordWriterSet;
private volatile Worker worker;
final AtomicReference<WorkerStateChangeListener.WorkerState> workerState = new AtomicReference<>(null);
private final AtomicBoolean stopped = new AtomicBoolean(false);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return isRecordReaderSet && isRecordWriterSet ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (RECORD_READER.equals(descriptor)) {
isRecordReaderSet = StringUtils.isNotEmpty(newValue);
} else if (RECORD_WRITER.equals(descriptor)) {
isRecordWriterSet = StringUtils.isNotEmpty(newValue);
}
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.addValidator(this::validateDynamicKCLConfigProperty)
.expressionLanguageSupported(ExpressionLanguageScope.NONE);
return builder.build();
}
private ValidationResult validateDynamicKCLConfigProperty(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder validationResult = new ValidationResult.Builder().subject(subject).input(input);
if (!subject.matches("^(?!with)[a-zA-Z]\\w*$")) {
return validationResult
.explanation("Property name must not have a prefix of \"with\", must start with a letter and contain only letters, numbers or underscores")
.valid(false).build();
}
if (DISALLOWED_DYNAMIC_KCL_PROPERTIES.keySet().stream().anyMatch(k -> k.equalsIgnoreCase(subject))) {
return validationResult
.explanation(String.format("Use \"%s\" instead of a dynamic property", DISALLOWED_DYNAMIC_KCL_PROPERTIES.get(subject).getDisplayName()))
.valid(false).build();
}
final KinesisClientLibConfiguration kclTemp = new KinesisClientLibConfiguration("validate", "validate", null, "validate");
try {
final String propName = StringUtils.uncapitalize(subject);
if (!PROPERTY_UTILS_BEAN.isWriteable(kclTemp, propName)) {
return validationResult
.explanation(String.format("Kinesis Client Library Configuration property with name %s does not exist or is not writable", StringUtils.capitalize(subject)))
.valid(false).build();
}
BEAN_UTILS_BEAN.setProperty(kclTemp, propName, input);
} catch (IllegalAccessException e) {
return validationResult
.explanation(String.format("Kinesis Client Library Configuration property with name %s is not accessible", StringUtils.capitalize(subject)))
.valid(false).build();
} catch (InvocationTargetException e) {
return buildDynamicPropertyBeanValidationResult(validationResult, subject, input, e.getTargetException().getLocalizedMessage());
} catch (IllegalArgumentException e) {
return buildDynamicPropertyBeanValidationResult(validationResult, subject, input, e.getLocalizedMessage());
}
return validationResult.valid(true).build();
}
private ValidationResult buildDynamicPropertyBeanValidationResult(final ValidationResult.Builder validationResult,
final String subject, final String input, final String message) {
return validationResult
.explanation(
String.format("Kinesis Client Library Configuration property with name %s cannot be used with value \"%s\" : %s",
StringUtils.capitalize(subject), input, message)
)
.valid(false).build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Set<ValidationResult> validationResults = new HashSet<>(super.customValidate(validationContext));
DateTimeFormatter dateTimeFormatter = null;
try {
dateTimeFormatter = getDateTimeFormatter(validationContext);
} catch (IllegalArgumentException iae) {
validationResults.add(new ValidationResult.Builder().valid(false)
.subject(TIMESTAMP_FORMAT.getName())
.explanation(String.format("%s must be a valid java.time.DateTimeFormatter format", TIMESTAMP_FORMAT.getDisplayName()))
.build()
);
}
if (InitialPositionInStream.AT_TIMESTAMP == getInitialPositionInStream(validationContext)) {
if (!validationContext.getProperty(STREAM_POSITION_TIMESTAMP).isSet()) {
validationResults.add(new ValidationResult.Builder().valid(false)
.subject(STREAM_POSITION_TIMESTAMP.getName())
.explanation(String.format("%s must be provided when %s is %s", STREAM_POSITION_TIMESTAMP.getDisplayName(),
INITIAL_STREAM_POSITION.getDisplayName(), AT_TIMESTAMP.getDisplayName()))
.build()
);
} else if (dateTimeFormatter != null) {
try {
// check the streamTimestamp can be formatted
getStartStreamTimestamp(validationContext, dateTimeFormatter);
} catch (Exception e) {
validationResults.add(new ValidationResult.Builder().valid(false)
.subject(STREAM_POSITION_TIMESTAMP.getName())
.explanation(String.format("%s must be parsable by %s", STREAM_POSITION_TIMESTAMP.getDisplayName(),
TIMESTAMP_FORMAT.getDisplayName()))
.build());
}
}
}
if (isRecordReaderSet && !isRecordWriterSet) {
validationResults.add(new ValidationResult.Builder()
.subject(RECORD_WRITER.getName())
.explanation(String.format("%s must be set if %s is set in order to write FlowFiles as Records.",
RECORD_WRITER.getDisplayName(), RECORD_READER.getDisplayName()))
.valid(false)
.build());
} else if (isRecordWriterSet && !isRecordReaderSet) {
validationResults.add(new ValidationResult.Builder()
.subject(RECORD_READER.getName())
.explanation(String.format("%s must be set if %s is set in order to write FlowFiles as Records.",
RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName()))
.valid(false)
.build());
}
return validationResults;
}
@OnScheduled
@Override
public void onScheduled(ProcessContext context) {
stopped.set(false);
workerState.set(null);
super.onScheduled(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
if (worker == null) {
synchronized (WORKER_LOCK) {
if (worker == null) {
final String workerId = generateWorkerId();
getLogger().info("Starting Kinesis Worker {}", workerId);
// create worker (WorkerState will be CREATED)
worker = prepareWorker(context, sessionFactory, workerId);
// initialise and start Worker (will set WorkerState to INITIALIZING and attempt to start)
new Thread(worker, WORKER_THREAD_NAME_TEMPLATE + workerId).start();
}
}
} else {
// after a Worker is registered successfully, nothing has to be done at onTrigger
// new sessions are created when new messages are consumed by the Worker
// and if the WorkerState is unexpectedly SHUT_DOWN, then we don't want to immediately re-enter onTrigger
context.yield();
if (!stopped.get() && WorkerStateChangeListener.WorkerState.SHUT_DOWN == workerState.get()) {
throw new ProcessException("Worker has shutdown unexpectedly, possibly due to a configuration issue; check logs for details");
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// intentionally blank (using onTrigger with ProcessSessionFactory above instead)
}
@OnStopped
public void stopConsuming(final ProcessContext context) {
if (worker != null) {
synchronized (WORKER_LOCK) {
if (worker != null) {
// indicate whether the processor has been Stopped; the Worker can be marked as SHUT_DOWN but still be waiting
// for ShardConsumers/RecordProcessors to complete, etc.
stopped.set(true);
final boolean success = shutdownWorker(context);
worker = null;
workerState.set(null);
if (!success) {
getLogger().warn("One or more problems while shutting down Kinesis Worker, see logs for details");
}
}
}
}
}
private synchronized Worker prepareWorker(final ProcessContext context, final ProcessSessionFactory sessionFactory, final String workerId) {
final IRecordProcessorFactory factory = prepareRecordProcessorFactory(context, sessionFactory);
final KinesisClientLibConfiguration kinesisClientLibConfiguration =
prepareKinesisClientLibConfiguration(context, workerId);
final Worker.Builder workerBuilder = prepareWorkerBuilder(context, kinesisClientLibConfiguration, factory);
getLogger().info("Kinesis Worker prepared for application {} to process stream {} as worker ID {}...",
getApplicationName(context), getStreamName(context), workerId);
return workerBuilder.build();
}
private IRecordProcessorFactory prepareRecordProcessorFactory(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
return () -> {
if (isRecordReaderSet && isRecordWriterSet) {
return new KinesisRecordProcessorRecord(
sessionFactory, getLogger(), getStreamName(context), getClient().getEndpointPrefix(),
getKinesisEndpoint(context).orElse(null), getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context), getDateTimeFormatter(context),
getReaderFactory(context), getWriterFactory(context)
);
} else {
return new KinesisRecordProcessorRaw(
sessionFactory, getLogger(), getStreamName(context), getClient().getEndpointPrefix(),
getKinesisEndpoint(context).orElse(null), getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context), getDateTimeFormatter(context)
);
}
};
}
/*
* Developer note: if setting KCL configuration explicitly from processor properties, be sure to add them to the
* DISALLOWED_DYNAMIC_KCL_PROPERTIES list so Dynamic Properties can't be used to override the static properties
*/
KinesisClientLibConfiguration prepareKinesisClientLibConfiguration(final ProcessContext context, final String workerId) {
@SuppressWarnings("deprecated")
final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
getApplicationName(context),
getStreamName(context),
getCredentialsProvider(context),
workerId
)
.withCommonClientConfig(getClient().getClientConfiguration())
.withRegionName(getRegion().getName())
.withFailoverTimeMillis(getFailoverTimeMillis(context))
.withShutdownGraceMillis(getGracefulShutdownMillis(context));
final InitialPositionInStream initialPositionInStream = getInitialPositionInStream(context);
if (InitialPositionInStream.AT_TIMESTAMP == initialPositionInStream) {
kinesisClientLibConfiguration.withTimestampAtInitialPositionInStream(getStartStreamTimestamp(context));
} else {
kinesisClientLibConfiguration.withInitialPositionInStream(initialPositionInStream);
}
getDynamoDBOverride(context).ifPresent(kinesisClientLibConfiguration::withDynamoDBEndpoint);
getKinesisEndpoint(context).ifPresent(kinesisClientLibConfiguration::withKinesisEndpoint);
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
dynamicProperties.forEach(descriptor -> {
final String name = descriptor.getName();
final String value = context.getProperty(descriptor).getValue();
try {
BEAN_UTILS_BEAN.setProperty(kinesisClientLibConfiguration, StringUtils.uncapitalize(name), value);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ProcessException(String.format("Unable to set Kinesis Client Library Configuration property %s with value %s", StringUtils.capitalize(name), value), e);
}
});
return kinesisClientLibConfiguration;
}
Worker.Builder prepareWorkerBuilder(final ProcessContext context, final KinesisClientLibConfiguration kinesisClientLibConfiguration,
final IRecordProcessorFactory factory) {
final Worker.Builder workerBuilder = new Worker.Builder()
.config(kinesisClientLibConfiguration)
.kinesisClient(getClient())
.workerStateChangeListener(workerState::set)
.recordProcessorFactory(factory);
if (!isReportCloudWatchMetrics(context)) {
workerBuilder.metricsFactory(new NullMetricsFactory());
}
return workerBuilder;
}
private boolean shutdownWorker(final ProcessContext context) {
boolean success = true;
try {
if (!worker.hasGracefulShutdownStarted()) {
getLogger().info("Requesting Kinesis Worker shutdown");
final Future<Boolean> shutdown = worker.startGracefulShutdown();
// allow 2 seconds longer than the graceful period for shutdown before cancelling the task
if (Boolean.FALSE.equals(shutdown.get(getGracefulShutdownMillis(context) + 2_000L, TimeUnit.MILLISECONDS))) {
getLogger().warn("Kinesis Worker shutdown did not complete in time, cancelling");
success = false;
} else {
getLogger().info("Kinesis Worker shutdown");
}
}
} catch (InterruptedException | TimeoutException | ExecutionException e) {
getLogger().warn("Problem while shutting down Kinesis Worker: {}", e.getLocalizedMessage(), e);
success = false;
}
return success;
}
private String generateWorkerId() {
try {
return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
} catch (UnknownHostException e) {
throw new ProcessException(e);
}
}
private String getApplicationName(final PropertyContext context) {
return StringUtils.trimToEmpty(context.getProperty(APPLICATION_NAME).getValue());
}
private String getStreamName(final PropertyContext context) {
return StringUtils.trimToEmpty(context.getProperty(KINESIS_STREAM_NAME).getValue());
}
private long getFailoverTimeMillis(final PropertyContext context) {
return context.getProperty(FAILOVER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
}
private long getGracefulShutdownMillis(final PropertyContext context) {
return context.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
}
private long getCheckpointIntervalMillis(final PropertyContext context) {
return context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
}
private int getNumRetries(final PropertyContext context) {
return context.getProperty(NUM_RETRIES).asInteger();
}
private long getRetryWaitMillis(final PropertyContext context) {
return context.getProperty(RETRY_WAIT).asTimePeriod(TimeUnit.MILLISECONDS);
}
private boolean isReportCloudWatchMetrics(final PropertyContext context) {
return context.getProperty(REPORT_CLOUDWATCH_METRICS).asBoolean();
}
private Optional<String> getKinesisEndpoint(final PropertyContext context) {
return context.getProperty(ENDPOINT_OVERRIDE).isSet()
? Optional.of(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()))
: Optional.empty();
}
private Optional<String> getDynamoDBOverride(final PropertyContext context) {
return context.getProperty(DYNAMODB_ENDPOINT_OVERRIDE).isSet()
? Optional.of(StringUtils.trimToEmpty(context.getProperty(DYNAMODB_ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()))
: Optional.empty();
}
private RecordReaderFactory getReaderFactory(final PropertyContext context) {
return context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
}
private RecordSetWriterFactory getWriterFactory(final PropertyContext context) {
return context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
}
private InitialPositionInStream getInitialPositionInStream(final PropertyContext context) {
return InitialPositionInStream.valueOf(StringUtils.trimToEmpty(context.getProperty(INITIAL_STREAM_POSITION).getValue()));
}
private DateTimeFormatter getDateTimeFormatter(final PropertyContext context) {
return DateTimeFormatter.ofPattern(context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue());
}
private Date getStartStreamTimestamp(final PropertyContext context) {
return getStartStreamTimestamp(context, getDateTimeFormatter(context));
}
private Date getStartStreamTimestamp(final PropertyContext context, final DateTimeFormatter dateTimeFormatter) {
final String streamTimestamp = context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
return new Date(
LocalDateTime.parse(streamTimestamp, dateTimeFormatter).atZone(ZoneId.systemDefault()) // parse date/time with system timezone
.withZoneSameInstant(ZoneOffset.UTC) // convert to UTC
.toInstant().toEpochMilli() // convert to epoch milliseconds for creating Date
);
}
}

View File

@ -16,16 +16,11 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -33,6 +28,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -42,11 +38,15 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -58,7 +58,12 @@ import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
@WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
@WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
@WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
@SeeAlso(ConsumeKinesisStream.class)
public class PutKinesisStream extends AbstractKinesisStreamProcessor {
/**
* Kinesis put record response error message
*/
public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
/**
* Kinesis put record response error code
@ -78,6 +83,31 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
.required(false)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Message Batch Size")
.name("message-batch-size")
.description("Batch size for messages (1-500).")
.defaultValue("250")
.required(false)
.addValidator(StandardValidators.createLongValidator(1, 500, true))
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
.name("max-message-buffer-size")
.displayName("Max message buffer size (MB)")
.description("Max message buffer size in Mega-bytes")
.defaultValue("1 MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractKinesisStreamProcessor.KINESIS_STREAM_NAME)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));

View File

@ -0,0 +1,322 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public abstract class AbstractKinesisRecordProcessor implements IRecordProcessor {
public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
public static final String AWS_KINESIS_PARTITION_KEY = "aws.kinesis.partition.key";
public static final String AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP = "aws.kinesis.approximate.arrival.timestamp";
public static final String KINESIS_RECORD_SCHEMA_KEY = "kinesis.name";
static final Base64.Encoder BASE_64_ENCODER = Base64.getEncoder();
private final ProcessSessionFactory sessionFactory;
private final ComponentLog log;
private final String streamName;
private final String transitUriPrefix;
private final long checkpointIntervalMillis;
private final long retryWaitMillis;
private final int numRetries;
private final DateTimeFormatter dateTimeFormatter;
private String kinesisShardId;
private long nextCheckpointTimeInMillis;
private boolean processingRecords = false;
AbstractKinesisRecordProcessor(final ProcessSessionFactory sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final String kinesisEndpoint,
final long checkpointIntervalMillis, final long retryWaitMillis,
final int numRetries, final DateTimeFormatter dateTimeFormatter) {
this.sessionFactory = sessionFactory;
this.log = log;
this.streamName = streamName;
this.checkpointIntervalMillis = checkpointIntervalMillis;
this.retryWaitMillis = retryWaitMillis;
this.numRetries = numRetries;
this.dateTimeFormatter = dateTimeFormatter;
this.transitUriPrefix = StringUtils.isBlank(kinesisEndpoint) ? String.format("http://%s.amazonaws.com", endpointPrefix) : kinesisEndpoint;
}
@Override
public void initialize(final InitializationInput initializationInput) {
if (initializationInput.getPendingCheckpointSequenceNumber() != null) {
log.warn("Initializing record processor for stream: {} / shard {}; from sequence number: {}; indicates previously uncheckpointed sequence number: {}",
streamName, initializationInput.getShardId(), initializationInput.getExtendedSequenceNumber(), initializationInput.getPendingCheckpointSequenceNumber());
} else {
log.debug("Initializing record processor for stream: {} / shard: {}; from sequence number: {}",
streamName, initializationInput.getShardId(), initializationInput.getExtendedSequenceNumber());
}
this.kinesisShardId = initializationInput.getShardId();
// ensure we don't immediately checkpoint
this.nextCheckpointTimeInMillis = System.currentTimeMillis() + checkpointIntervalMillis;
}
@Override
public void processRecords(final ProcessRecordsInput processRecordsInput) {
if (log.isDebugEnabled()) {
log.debug("Processing {} records from {}; cache entry: {}; cache exit: {}; millis behind latest: {}",
processRecordsInput.getRecords().size(), kinesisShardId,
processRecordsInput.getCacheEntryTime() != null ? dateTimeFormatter.format(processRecordsInput.getCacheEntryTime().atZone(ZoneId.systemDefault())) : null,
processRecordsInput.getCacheExitTime() != null ? dateTimeFormatter.format(processRecordsInput.getCacheExitTime().atZone(ZoneId.systemDefault())) : null,
processRecordsInput.getMillisBehindLatest());
}
ProcessSession session = null;
try {
final List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) {
final List<FlowFile> flowFiles = new ArrayList<>(records.size());
final StopWatch stopWatch = new StopWatch(true);
session = sessionFactory.createSession();
startProcessingRecords();
final int recordsTransformed = processRecordsWithRetries(records, flowFiles, session, stopWatch);
transferTo(ConsumeKinesisStream.REL_SUCCESS, session, records.size(), recordsTransformed, flowFiles);
session.commit();
processingRecords = false;
// if creating an Kinesis checkpoint fails, then the same record(s) can be retrieved again
checkpointOnceEveryCheckpointInterval(processRecordsInput.getCheckpointer());
}
} catch (final Exception e) {
log.error("Unable to fully process received Kinesis record(s) due to {}", e.getLocalizedMessage(), e);
// FlowFiles that are already committed will not get rolled back
if (session != null) {
session.rollback();
}
}
}
void startProcessingRecords() {
processingRecords = true;
}
private int processRecordsWithRetries(final List<Record> records, final List<FlowFile> flowFiles,
final ProcessSession session, final StopWatch stopWatch) {
int recordsTransformed = 0;
for (int r = 0; r < records.size(); r++) {
final Record kinesisRecord = records.get(r);
boolean processedSuccessfully = false;
for (int i = 0; !processedSuccessfully && i < numRetries; i++) {
processedSuccessfully = attemptProcessRecord(flowFiles, kinesisRecord, r == records.size() - 1, session, stopWatch);
}
if (processedSuccessfully) {
recordsTransformed++;
} else {
log.error("Couldn't process Kinesis record {}, skipping.", kinesisRecord);
}
}
return recordsTransformed;
}
private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final Record kinesisRecord, final boolean lastRecord,
final ProcessSession session, final StopWatch stopWatch) {
boolean processedSuccessfully = false;
try {
processRecord(flowFiles, kinesisRecord, lastRecord, session, stopWatch);
processedSuccessfully = true;
} catch (final Exception e) {
log.error("Caught Exception while processing Kinesis record {}", kinesisRecord, e);
// backoff if we encounter an exception.
try {
Thread.sleep(retryWaitMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep during record processing back-off", ie);
}
}
return processedSuccessfully;
}
/**
* Process an individual {@link Record} and serialise to {@link FlowFile}
*
* @param flowFiles {@link List} of {@link FlowFile}s to be output after all processing is complete
* @param kinesisRecord the Kinesis {@link Record} to be processed
* @param lastRecord whether this is the last {@link Record} to be processed in this batch
* @param session {@link ProcessSession} into which {@link FlowFile}s will be transferred
* @param stopWatch {@link StopWatch} tracking how much time has been spent processing the current batch
*
* @throws RuntimeException if there are any unhandled Exceptions that should be retried
*/
abstract void processRecord(final List<FlowFile> flowFiles, final Record kinesisRecord, final boolean lastRecord,
final ProcessSession session, final StopWatch stopWatch);
void reportProvenance(final ProcessSession session, final FlowFile flowFile, final String partitionKey,
final String sequenceNumber, final StopWatch stopWatch) {
final String transitUri = StringUtils.isNotBlank(partitionKey) && StringUtils.isNotBlank(sequenceNumber)
? String.format("%s/%s/%s#%s", transitUriPrefix, kinesisShardId, partitionKey, sequenceNumber)
: String.format("%s/%s", transitUriPrefix, kinesisShardId);
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
Map<String, String> getDefaultAttributes(final String sequenceNumber, final String partitionKey, final Date approximateArrivalTimestamp) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(AWS_KINESIS_SHARD_ID, kinesisShardId);
attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, sequenceNumber);
attributes.put(AWS_KINESIS_PARTITION_KEY, partitionKey);
if (approximateArrivalTimestamp != null) {
attributes.put(AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP,
dateTimeFormatter.format(approximateArrivalTimestamp.toInstant().atZone(ZoneId.systemDefault())));
}
return attributes;
}
void transferTo(final Relationship relationship, final ProcessSession session, final int recordsProcessed,
final int recordsTransformed, final List<FlowFile> flowFiles) {
session.adjustCounter("Records Processed", recordsProcessed, false);
if (!flowFiles.isEmpty()) {
session.adjustCounter("Records Transformed", recordsTransformed, false);
session.transfer(flowFiles, relationship);
}
}
private void checkpointOnceEveryCheckpointInterval(final IRecordProcessorCheckpointer checkpointer) {
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpointWithRetries(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + checkpointIntervalMillis;
}
}
@Override
public void shutdown(final ShutdownInput shutdownInput) {
log.debug("Shutting down Record Processor for shard: {} with reason: {}", kinesisShardId, shutdownInput.getShutdownReason());
// be sure to finish processing any records before shutdown on TERMINATE
if (ShutdownReason.TERMINATE == shutdownInput.getShutdownReason()) {
for (int i = 0; processingRecords && i < numRetries; i++) {
log.debug("Record Processor for shard {} still processing records, waiting before shutdown", kinesisShardId);
try {
Thread.sleep(retryWaitMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep while waiting for record processing to complete before shutdown (TERMINATE)", ie);
}
}
if (processingRecords) {
log.warn("Record Processor for shard {} still running, but maximum wait time elapsed, checkpoint will be attempted", kinesisShardId);
}
}
checkpointWithRetries(shutdownInput.getCheckpointer());
}
private void checkpointWithRetries(final IRecordProcessorCheckpointer checkpointer) {
log.debug("Checkpointing shard " + kinesisShardId);
try {
for (int i = 0; i < numRetries; i++) {
if (attemptCheckpoint(checkpointer, i)) {
break;
}
}
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
log.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
private boolean attemptCheckpoint(final IRecordProcessorCheckpointer checkpointer, final int attempt) throws ShutdownException, InvalidStateException {
boolean success = false;
try {
checkpointer.checkpoint();
success = true;
} catch (ThrottlingException e) {
// Backoff and re-attempt checkpoint upon transient failures
if (attempt >= (numRetries - 1)) {
log.error("Checkpoint failed after {} attempts.", attempt + 1, e);
} else {
log.warn("Transient issue when checkpointing - attempt {} of {}", attempt + 1, numRetries, e);
try {
Thread.sleep(retryWaitMillis);
} catch (InterruptedException ie) {
log.debug("Interrupted sleep during checkpoint back-off", ie);
}
}
}
return success;
}
ComponentLog getLogger() {
return log;
}
String getKinesisShardId() {
return kinesisShardId;
}
void setKinesisShardId(final String kinesisShardId) {
this.kinesisShardId = kinesisShardId;
}
long getNextCheckpointTimeInMillis() {
return nextCheckpointTimeInMillis;
}
void setNextCheckpointTimeInMillis(final long nextCheckpointTimeInMillis) {
this.nextCheckpointTimeInMillis = nextCheckpointTimeInMillis;
}
void setProcessingRecords(final boolean processingRecords) {
this.processingRecords = processingRecords;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.util.StopWatch;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class KinesisRecordProcessorRaw extends AbstractKinesisRecordProcessor {
public KinesisRecordProcessorRaw(final ProcessSessionFactory sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final String kinesisEndpoint,
final long checkpointIntervalMillis, final long retryWaitMillis,
final int numRetries, final DateTimeFormatter dateTimeFormatter) {
super(sessionFactory, log, streamName, endpointPrefix, kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
numRetries, dateTimeFormatter);
}
@Override
void processRecord(final List<FlowFile> flowFiles, final Record kinesisRecord, final boolean lastRecord,
final ProcessSession session, final StopWatch stopWatch) {
final String partitionKey = kinesisRecord.getPartitionKey();
final String sequenceNumber = kinesisRecord.getSequenceNumber();
final Date approximateArrivalTimestamp = kinesisRecord.getApproximateArrivalTimestamp();
final byte[] data = kinesisRecord.getData() != null ? kinesisRecord.getData().array() : new byte[0];
FlowFile flowFile = session.create();
session.write(flowFile, out -> out.write(data));
if (getLogger().isDebugEnabled()) {
getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}", sequenceNumber, partitionKey, BASE_64_ENCODER.encodeToString(data));
}
reportProvenance(session, flowFile, partitionKey, sequenceNumber, stopWatch);
final Map<String, String> attributes = getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
flowFile = session.putAllAttributes(flowFile, attributes);
flowFiles.add(flowFile);
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class KinesisRecordProcessorRecord extends AbstractKinesisRecordProcessor {
final RecordReaderFactory readerFactory;
final RecordSetWriterFactory writerFactory;
final Map<String, String> schemaRetrievalVariables;
private RecordSetWriter writer;
private OutputStream outputStream;
public KinesisRecordProcessorRecord(final ProcessSessionFactory sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final String kinesisEndpoint,
final long checkpointIntervalMillis, final long retryWaitMillis,
final int numRetries, final DateTimeFormatter dateTimeFormatter,
final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory) {
super(sessionFactory, log, streamName, endpointPrefix, kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
numRetries, dateTimeFormatter);
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
schemaRetrievalVariables = Collections.singletonMap(KINESIS_RECORD_SCHEMA_KEY, streamName);
}
@Override
void startProcessingRecords() {
super.startProcessingRecords();
outputStream = null;
writer = null;
}
@Override
void processRecord(final List<FlowFile> flowFiles, final Record kinesisRecord, final boolean lastRecord,
final ProcessSession session, final StopWatch stopWatch) {
boolean firstOutputRecord = true;
int recordCount = 0;
final byte[] data = kinesisRecord.getData() != null ? kinesisRecord.getData().array() : new byte[0];
FlowFile flowFile = null;
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, getLogger())
) {
org.apache.nifi.serialization.record.Record outputRecord;
final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
while ((outputRecord = recordSet.next()) != null) {
if (flowFiles.isEmpty()) {
flowFile = session.create();
flowFiles.add(flowFile);
// initialize the writer when the first record is read.
createWriter(flowFile, session, outputRecord);
}
final WriteResult writeResult = writer.write(outputRecord);
recordCount += writeResult.getRecordCount();
// complete the FlowFile if there are no more incoming Kinesis Records and no more records in this RecordSet
if (lastRecord && !recordSet.isAnotherRecord()) {
completeFlowFile(flowFiles, session, recordCount, writeResult, kinesisRecord, stopWatch);
}
firstOutputRecord = false;
}
} catch (final MalformedRecordException | IOException | SchemaNotFoundException e) {
// write raw Kinesis Record to the parse failure relationship
getLogger().error("Failed to parse message from Kinesis Stream using configured Record Reader and Writer due to {}",
e.getLocalizedMessage(), e);
outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles, session, data, kinesisRecord, e);
}
if (getLogger().isDebugEnabled()) {
getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}",
kinesisRecord.getSequenceNumber(), kinesisRecord.getPartitionKey(), BASE_64_ENCODER.encodeToString(data));
}
}
private void createWriter(final FlowFile flowFile, final ProcessSession session,
final org.apache.nifi.serialization.record.Record outputRecord)
throws IOException, SchemaNotFoundException {
final RecordSchema readerSchema = outputRecord.getSchema();
final RecordSchema writeSchema = writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
outputStream = session.write(flowFile);
writer = writerFactory.createWriter(getLogger(), writeSchema, outputStream, flowFile);
writer.beginRecordSet();
}
private void completeFlowFile(final List<FlowFile> flowFiles, final ProcessSession session, final int recordCount,
final WriteResult writeResult, final Record lastRecord, final StopWatch stopWatch)
throws IOException {
try {
writer.finishRecordSet();
} catch (IOException e) {
getLogger().error("Failed to finish record output due to {}", e.getLocalizedMessage(), e);
session.remove(flowFiles.get(0));
flowFiles.remove(0);
throw e;
} finally {
try {
writer.close();
outputStream.close();
} catch (final IOException e) {
getLogger().warn("Failed to close Record Writer due to {}", e.getLocalizedMessage(), e);
}
}
reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
final Map<String, String> attributes = getDefaultAttributes(lastRecord);
attributes.put("record.count", String.valueOf(recordCount));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
flowFiles.set(0, session.putAllAttributes(flowFiles.get(0), attributes));
writer = null;
outputStream = null;
}
private void outputRawRecordOnException(final boolean firstOutputRecord, final FlowFile flowFile,
final List<FlowFile> flowFiles, final ProcessSession session,
final byte[] data, final Record kinesisRecord, final Exception e) {
if (firstOutputRecord && flowFile != null) {
session.remove(flowFile);
flowFiles.remove(0);
if (writer != null) {
try {
writer.close();
outputStream.close();
} catch (IOException ioe) {
getLogger().warn("Failed to close Record Writer due to {}", ioe.getLocalizedMessage(), ioe);
}
}
}
FlowFile failed = session.create();
session.write(failed, o -> o.write(data));
final Map<String, String> attributes = getDefaultAttributes(kinesisRecord);
final Throwable c = e.getCause() != null ? e.getCause() : e;
attributes.put("record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
failed = session.putAllAttributes(failed, attributes);
transferTo(ConsumeKinesisStream.REL_PARSE_FAILURE, session, 0, 0, Collections.singletonList(failed));
}
private Map<String, String> getDefaultAttributes(final Record kinesisRecord) {
final String partitionKey = kinesisRecord.getPartitionKey();
final String sequenceNumber = kinesisRecord.getSequenceNumber();
final Date approximateArrivalTimestamp = kinesisRecord.getApproximateArrivalTimestamp();
return getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
}
}

View File

@ -1,32 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object
org.apache.nifi.processors.aws.s3.TagS3Object
org.apache.nifi.processors.aws.s3.ListS3
org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS
org.apache.nifi.processors.aws.lambda.PutLambda
org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object
org.apache.nifi.processors.aws.s3.TagS3Object
org.apache.nifi.processors.aws.s3.ListS3
org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS
org.apache.nifi.processors.aws.lambda.PutLambda
org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi

View File

@ -0,0 +1,55 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ConsumeKinesisStream</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Streaming Versus Batch Processing</h1>
<p>
ConsumeKinesisStream retrieves all Kinesis Records that it encounters in the configured Kinesis Stream.
There are two common, broadly defined use cases.
</p>
<h3>Per-Message Use Case</h3>
<p>
By default, the Processor will create a separate FlowFile for each Kinesis Record (message) in the Stream
and add attributes for shard id, sequence number, etc.
</p>
<h3>Per-Batch Use Case</h3>
<p>
Another common use case is the desire to process all Kinesis Records retrieved from the Stream in a batch as a single FlowFile.
</p>
<p>
The ConsumeKinesisStream Processor can optionally be configured with a Record Reader and Record Writer.
When a Record Reader and Record Writer are configured, a single FlowFile will be created that will contain a Record
for each Record within the batch of Kinesis Records (messages), instead of a separate FlowFile per Kinesis Record.
</p>
<p>
The FlowFiles emitted in this mode will include the standard <code>record.*</code> attributes along with the same
Kinesis Shard ID, Sequence Number and Approximate Arrival Timestamp; but the values will relate to the <b>last</b>
Kinesis Record that was processed in the batch of messages constituting the content of the FlowFile.
</p>
</body>
</html>

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Executors;
import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
public abstract class ITConsumeKinesisStream {
static final String KINESIS_STREAM_NAME = "test-stream";
static final String APPLICATION_NAME = "test-application";
static final String REGION = System.getProperty("AWS_DEFAULT_REGION", Regions.US_EAST_1.getName());
protected TestRunner runner;
AmazonKinesis kinesis;
AmazonDynamoDB dynamoDB;
@Test
public void readHorizon() throws InterruptedException, IOException {
String partitionKey = "1";
kinesis.putRecord(KINESIS_STREAM_NAME, ByteBuffer.wrap("horizon".getBytes()), partitionKey);
startKCL(runner, InitialPositionInStream.TRIM_HORIZON);
runner.assertAllFlowFilesTransferred(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
out.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_PARTITION_KEY, partitionKey);
out.assertContentEquals("horizon".getBytes());
}
@Test
public void readLatest() throws InterruptedException, IOException {
String partitionKey = "1";
kinesis.putRecord(KINESIS_STREAM_NAME, ByteBuffer.wrap("horizon".getBytes()), partitionKey);
startKCL(runner, InitialPositionInStream.LATEST);
kinesis.putRecord(KINESIS_STREAM_NAME, ByteBuffer.wrap("latest".getBytes()), partitionKey);
waitForKCLToProcessTheLatestMessage();
runner.assertAllFlowFilesTransferred(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
out.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_PARTITION_KEY, partitionKey);
out.assertContentEquals("latest".getBytes());
}
private void waitForKCLToProcessTheLatestMessage() throws InterruptedException {
Thread.sleep(10_000);
}
private void startKCL(TestRunner runner, InitialPositionInStream initialPositionInStream) throws InterruptedException {
runner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, initialPositionInStream.name());
runner.assertValid();
Executors.newSingleThreadScheduledExecutor().submit((Runnable) runner::run);
Thread.sleep(30_000);
}
@After
public void tearDown() throws InterruptedException {
cleanupKinesis();
cleanupDynamoDB();
runner = null;
Thread.sleep(2_000);
System.clearProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
}
private void cleanupDynamoDB() {
if (dynamoDB != null) {
ListTablesResult tableResults = dynamoDB.listTables();
List<String> tableName = tableResults.getTableNames();
if (tableName.contains(APPLICATION_NAME)) {
dynamoDB.deleteTable(APPLICATION_NAME);
}
}
dynamoDB = null;
}
private void cleanupKinesis() {
if (kinesis != null) {
ListStreamsResult streamsResult = kinesis.listStreams();
List<String> streamNames = streamsResult.getStreamNames();
if (streamNames.contains(KINESIS_STREAM_NAME)) {
kinesis.deleteStream(new DeleteStreamRequest().withStreamName(KINESIS_STREAM_NAME));
}
}
kinesis = null;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
public class ITConsumeKinesisStreamConnectAWS extends ITConsumeKinesisStream {
private final static String CREDENTIALS_FILE =
System.getProperty("user.home") + "/aws-credentials.properties";
@Before
public void setUp() throws InterruptedException {
System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
kinesis = AmazonKinesisClient.builder()
.withCredentials(new PropertiesFileCredentialsProvider(CREDENTIALS_FILE))
.withRegion(REGION)
.build();
kinesis.createStream(KINESIS_STREAM_NAME, 1);
dynamoDB = AmazonDynamoDBClient.builder()
.withCredentials(new PropertiesFileCredentialsProvider(CREDENTIALS_FILE))
.withRegion(REGION)
.build();
waitForKinesisToInitialize();
runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
runner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, APPLICATION_NAME);
runner.setProperty(ConsumeKinesisStream.KINESIS_STREAM_NAME, KINESIS_STREAM_NAME);
runner.setProperty(ConsumeKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(ConsumeKinesisStream.REGION, REGION);
runner.setProperty(ConsumeKinesisStream.REPORT_CLOUDWATCH_METRICS, "false");
runner.assertValid();
}
private void waitForKinesisToInitialize() throws InterruptedException {
Thread.sleep(20_000);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
public class ITConsumeKinesisStreamEndpointOverride extends ITConsumeKinesisStream {
private static final String ACCESS_KEY = "test";
private static final String SECRET_KEY = "test";
private static final String KINESIS_STREAM_NAME = "test-stream";
private static final String APPLICATION_NAME = "test-application";
private static final String LOCAL_STACK_KINESIS_ENDPOINT_OVERRIDE = "http://localhost:4566";
private static final String LOCAL_STACK_DYNAMODB_ENDPOINT_OVERRIDE = "http://localhost:4566";
private final AWSCredentialsProvider awsCredentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY));
private final AwsClientBuilder.EndpointConfiguration kinesisEndpointConfig =
new AwsClientBuilder.EndpointConfiguration(LOCAL_STACK_KINESIS_ENDPOINT_OVERRIDE, REGION);
private final AwsClientBuilder.EndpointConfiguration dynamoDBEndpointConfig =
new AwsClientBuilder.EndpointConfiguration(LOCAL_STACK_DYNAMODB_ENDPOINT_OVERRIDE, REGION);
@Before
public void setUp() throws InterruptedException {
System.setProperty(AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
kinesis = AmazonKinesisClient.builder()
.withEndpointConfiguration(kinesisEndpointConfig)
.withCredentials(awsCredentialsProvider)
.build();
kinesis.createStream(KINESIS_STREAM_NAME, 1);
dynamoDB = AmazonDynamoDBClient.builder()
.withEndpointConfiguration(dynamoDBEndpointConfig)
.withCredentials(awsCredentialsProvider)
.build();
waitForKinesisToInitialize();
runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
runner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, APPLICATION_NAME);
runner.setProperty(ConsumeKinesisStream.KINESIS_STREAM_NAME, KINESIS_STREAM_NAME);
runner.setProperty(ConsumeKinesisStream.ACCESS_KEY, ACCESS_KEY);
runner.setProperty(ConsumeKinesisStream.SECRET_KEY, SECRET_KEY);
runner.setProperty(ConsumeKinesisStream.REGION, REGION);
runner.setProperty(ConsumeKinesisStream.REPORT_CLOUDWATCH_METRICS, "false");
runner.setProperty(ConsumeKinesisStream.ENDPOINT_OVERRIDE, LOCAL_STACK_KINESIS_ENDPOINT_OVERRIDE + "/kinesis");
runner.setProperty(ConsumeKinesisStream.DYNAMODB_ENDPOINT_OVERRIDE, LOCAL_STACK_DYNAMODB_ENDPOINT_OVERRIDE + "/dynamodb");
runner.assertValid();
}
private void waitForKinesisToInitialize() throws InterruptedException {
Thread.sleep(1000);
}
}

View File

@ -16,14 +16,6 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -31,6 +23,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* This test contains both unit and integration test (integration tests are ignored by default).
* Running integration tests may result in failures due to provisioned capacity of Kinesis stream based on number of shards.

View File

@ -16,11 +16,6 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
import java.util.List;
import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -28,6 +23,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY;
// This integration test can be run against a mock Kenesis such as
// https://github.com/mhart/kinesalite or https://github.com/localstack/localstack
public class ITPutKinesisStreamWithEndpointOverride {

View File

@ -0,0 +1,513 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class TestConsumeKinesisStream {
private final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
@Before
public void setUp() throws InitializationException {
runner.setProperty(ConsumeKinesisStream.KINESIS_STREAM_NAME, "test-stream");
runner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, "test-application");
// use anonymous credentials by default
final ControllerService credentialsProvider = new AWSCredentialsProviderControllerService();
runner.addControllerService("credentials-provider", credentialsProvider);
runner.setProperty(credentialsProvider, CredentialPropertyDescriptors.USE_ANONYMOUS_CREDENTIALS, "true");
runner.assertValid(credentialsProvider);
runner.enableControllerService(credentialsProvider);
runner.setProperty(ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE, "credentials-provider");
runner.assertValid();
}
@Test
public void testValidWithCredentials() throws InitializationException {
final ControllerService credentialsProvider = new AWSCredentialsProviderControllerService();
runner.addControllerService("credentials-provider", credentialsProvider);
runner.setProperty(credentialsProvider, CredentialPropertyDescriptors.ACCESS_KEY, "access-key");
runner.setProperty(credentialsProvider, CredentialPropertyDescriptors.SECRET_KEY, "secret-key");
runner.assertValid(credentialsProvider);
runner.enableControllerService(credentialsProvider);
runner.setProperty(ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE, "credentials-provider");
runner.assertValid();
((ConsumeKinesisStream) runner.getProcessor()).onScheduled(runner.getProcessContext());
assertThat(runner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith("Creating client using aws credentials provider")), is(true));
// "raw" credentials aren't used
assertThat(runner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith("Creating client using aws credentials")), is(false));
}
@Test
public void testMissingMandatoryProperties() {
runner.removeProperty(ConsumeKinesisStream.KINESIS_STREAM_NAME);
runner.removeProperty(ConsumeKinesisStream.APPLICATION_NAME);
runner.removeProperty(ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE);
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::run);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 3 validation failures:\n" +
"'%s' is invalid because %s is required\n" +
"'%s' is invalid because %s is required\n" +
"'%s' is invalid because %s is required\n",
ConsumeKinesisStream.KINESIS_STREAM_NAME.getDisplayName(), ConsumeKinesisStream.KINESIS_STREAM_NAME.getDisplayName(),
ConsumeKinesisStream.APPLICATION_NAME.getDisplayName(), ConsumeKinesisStream.APPLICATION_NAME.getDisplayName(),
ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE.getDisplayName(), ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE.getDisplayName()
)));
}
@Test
public void testInvalidProperties() {
runner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, " ");
runner.setProperty(ConsumeKinesisStream.TIMESTAMP_FORMAT, "not-valid-format");
runner.setProperty(ConsumeKinesisStream.RETRY_WAIT, "not-a-long");
runner.setProperty(ConsumeKinesisStream.NUM_RETRIES, "not-an-int");
runner.setProperty(ConsumeKinesisStream.FAILOVER_TIMEOUT, "not-a-period");
runner.setProperty(ConsumeKinesisStream.GRACEFUL_SHUTDOWN_TIMEOUT, "not-a-period");
runner.setProperty(ConsumeKinesisStream.CHECKPOINT_INTERVAL, "not-a-long");
runner.setProperty(ConsumeKinesisStream.REPORT_CLOUDWATCH_METRICS, "not-a-boolean");
runner.setProperty(ConsumeKinesisStream.DYNAMODB_ENDPOINT_OVERRIDE, "not-a-url");
runner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, "not-an-enum-match");
runner.setProperty(ConsumeKinesisStream.RECORD_READER, "not-a-reader");
runner.setProperty(ConsumeKinesisStream.RECORD_WRITER, "not-a-writer");
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::run);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 14 validation failures:\n" +
"'%s' validated against ' ' is invalid because %s must contain at least one character that is not white space\n" +
"'%s' validated against 'not-a-reader' is invalid because Property references a Controller Service that does not exist\n" +
"'%s' validated against 'not-a-writer' is invalid because Property references a Controller Service that does not exist\n" +
"'%s' validated against 'not-a-url' is invalid because Not a valid URL\n" +
"'%s' validated against 'not-an-enum-match' is invalid because Given value not found in allowed set '%s, %s, %s'\n" +
"'%s' validated against 'not-valid-format' is invalid because Must be a valid java.time.DateTimeFormatter pattern, e.g. %s\n" +
"'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and " +
"TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n" +
"'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and " +
"TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n" +
"'%s' validated against 'not-a-long' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and " +
"TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n" +
"'%s' validated against 'not-an-int' is invalid because not a valid integer\n" +
"'%s' validated against 'not-a-long' is invalid because Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and " +
"TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n" +
"'%s' validated against 'not-a-boolean' is invalid because Given value not found in allowed set 'true, false'\n" +
"'%s' validated against 'not-a-reader' is invalid because Invalid Controller Service: not-a-reader is not a valid Controller Service Identifier\n" +
"'%s' validated against 'not-a-writer' is invalid because Invalid Controller Service: not-a-writer is not a valid Controller Service Identifier\n",
ConsumeKinesisStream.APPLICATION_NAME.getName(), ConsumeKinesisStream.APPLICATION_NAME.getName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName(),
ConsumeKinesisStream.DYNAMODB_ENDPOINT_OVERRIDE.getName(),
ConsumeKinesisStream.INITIAL_STREAM_POSITION.getName(), ConsumeKinesisStream.LATEST.getDisplayName(),
ConsumeKinesisStream.TRIM_HORIZON.getDisplayName(), ConsumeKinesisStream.AT_TIMESTAMP.getDisplayName(),
ConsumeKinesisStream.TIMESTAMP_FORMAT.getName(), RecordFieldType.TIMESTAMP.getDefaultFormat(),
ConsumeKinesisStream.FAILOVER_TIMEOUT.getName(),
ConsumeKinesisStream.GRACEFUL_SHUTDOWN_TIMEOUT.getName(),
ConsumeKinesisStream.CHECKPOINT_INTERVAL.getName(),
ConsumeKinesisStream.NUM_RETRIES.getName(),
ConsumeKinesisStream.RETRY_WAIT.getName(),
ConsumeKinesisStream.REPORT_CLOUDWATCH_METRICS.getName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName()
)));
}
@Test
public void testMissingStreamPositionTimestamp() {
runner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, InitialPositionInStream.AT_TIMESTAMP.toString());
runner.removeProperty(ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP);
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::run);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 1 validation failures:\n" +
"'%s' is invalid because %s must be provided when %s is %s\n",
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getName(), ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getDisplayName(),
ConsumeKinesisStream.INITIAL_STREAM_POSITION.getDisplayName(), InitialPositionInStream.AT_TIMESTAMP
)));
}
@Test
public void testInvalidStreamPositionTimestamp() {
runner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, InitialPositionInStream.AT_TIMESTAMP.toString());
runner.setProperty(ConsumeKinesisStream.TIMESTAMP_FORMAT, "yyyy-MM-dd");
runner.setProperty(ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP, "12:00:00");
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::run);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 1 validation failures:\n" +
"'%s' is invalid because %s must be parsable by %s\n",
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getName(),
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getDisplayName(),
ConsumeKinesisStream.TIMESTAMP_FORMAT.getDisplayName()
)));
}
@Test
public void testInvalidRecordReaderWithoutRecordWriter() throws InitializationException {
final ControllerService service = new JsonTreeReader();
runner.addControllerService("record-reader", service);
runner.enableControllerService(service);
runner.setProperty(ConsumeKinesisStream.RECORD_READER, "record-reader");
runner.removeProperty(ConsumeKinesisStream.RECORD_WRITER);
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::assertValid);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 1 validation failures:\n" +
"'%s' is invalid because %s must be set if %s is set in order to write FlowFiles as Records.\n",
ConsumeKinesisStream.RECORD_WRITER.getName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName()
)));
}
@Test
public void testInvalidRecordWriterWithoutRecordReader() throws InitializationException {
final ControllerService service = new JsonRecordSetWriter();
runner.addControllerService("record-writer", service);
runner.enableControllerService(service);
runner.setProperty(ConsumeKinesisStream.RECORD_WRITER, "record-writer");
runner.removeProperty(ConsumeKinesisStream.RECORD_READER);
runner.assertNotValid();
final AssertionError assertionError = assertThrows(AssertionError.class, runner::assertValid);
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 1 validation failures:\n" +
"'%s' is invalid because %s must be set if %s is set in order to write FlowFiles as Records.\n",
ConsumeKinesisStream.RECORD_READER.getName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName()
)));
}
@Test
public void testRunWorkerWithCredentials() throws UnknownHostException, InitializationException, InterruptedException {
runWorker(true, false);
}
@Test
public void testRunWorkerUnexpectedShutdown() throws UnknownHostException, InitializationException, InterruptedException {
runWorker(true, true);
}
@Test
public void testRunWorkerWithoutCredentials() throws UnknownHostException, InitializationException, InterruptedException {
runWorker(false, false);
}
@Test
public void testInvalidDynamicKCLProperties() {
// blank properties
runner.setProperty("", "empty");
runner.setProperty(" ", "blank");
// invalid property names
runner.setProperty("withPrefixNotAllowed", "a-value");
runner.setProperty("unknownProperty", "a-third-value");
runner.setProperty("toString", "cannot-call");
// invalid property names (cannot use nested/indexed/mapped properties via BeanUtils)
runner.setProperty("no.allowed", "no-.");
runner.setProperty("no[allowed", "no-[");
runner.setProperty("no]allowed", "no-]");
runner.setProperty("no(allowed", "no-(");
runner.setProperty("no)allowed", "no-)");
// can't override static properties
runner.setProperty("regionName", Regions.AF_SOUTH_1.getName());
runner.setProperty("timestampAtInitialPositionInStream", "2021-01-01 00:00:00");
runner.setProperty("initialPositionInStream", "AT_TIMESTAMP");
runner.setProperty("dynamoDBEndpoint", "http://localhost:4566/dynamodb");
runner.setProperty("kinesisEndpoint", "http://localhost:4566/kinesis");
// invalid parameter conversions
runner.setProperty("dynamoDBClientConfig", "too-complex");
runner.setProperty("shutdownGraceMillis", "not-long");
final AssertionError ae = assertThrows(AssertionError.class, runner::assertValid);
assertThat(ae.getMessage(), startsWith("Processor has 17 validation failures:\n"));
// blank properties
assertThat(ae.getMessage(), containsString("'Property Name' validated against '' is invalid because Invalid attribute key: <Empty String>\n"));
assertThat(ae.getMessage(), containsString("'Property Name' validated against ' ' is invalid because Invalid attribute key: <Empty String>\n"));
// invalid property names
assertThat(ae.getMessage(), containsString(
"'withPrefixNotAllowed' validated against 'a-value' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
assertThat(ae.getMessage(), containsString(
"'unknownProperty' validated against 'a-third-value' is invalid because Kinesis Client Library Configuration property with name " +
"UnknownProperty does not exist or is not writable\n"
));
assertThat(ae.getMessage(), containsString(
"'toString' validated against 'cannot-call' is invalid because Kinesis Client Library Configuration property with name " +
"ToString does not exist or is not writable\n"
));
// invalid property names (cannot use nested/indexed/mapped properties via BeanUtils)
assertThat(ae.getMessage(), containsString(
"'no.allowed' validated against 'no-.' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
assertThat(ae.getMessage(), containsString(
"'no[allowed' validated against 'no-[' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
assertThat(ae.getMessage(), containsString(
"'no]allowed' validated against 'no-]' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
assertThat(ae.getMessage(), containsString(
"'no(allowed' validated against 'no-(' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
assertThat(ae.getMessage(), containsString(
"'no)allowed' validated against 'no-)' is invalid because Property name must not have a prefix of \"with\", " +
"must start with a letter and contain only letters, numbers or underscores\n"
));
// can't override static properties
assertThat(ae.getMessage(), containsString("'regionName' validated against 'af-south-1' is invalid because Use \"Region\" instead of a dynamic property\n"));
assertThat(ae.getMessage(), containsString(
"'timestampAtInitialPositionInStream' validated against '2021-01-01 00:00:00' is invalid because Use \"Stream Position Timestamp\" instead of a dynamic property\n"
));
assertThat(ae.getMessage(), containsString(
"'initialPositionInStream' validated against 'AT_TIMESTAMP' is invalid because Use \"Initial Stream Position\" instead of a dynamic property\n"
));
assertThat(ae.getMessage(), containsString(
"'dynamoDBEndpoint' validated against 'http://localhost:4566/dynamodb' is invalid because Use \"DynamoDB Override\" instead of a dynamic property\n"
));
assertThat(ae.getMessage(), containsString(
"'kinesisEndpoint' validated against 'http://localhost:4566/kinesis' is invalid because Use \"Endpoint Override URL\" instead of a dynamic property\n"
));
// invalid parameter conversions
assertThat(ae.getMessage(), containsString(
"'dynamoDBClientConfig' validated against 'too-complex' is invalid because Kinesis Client Library Configuration property " +
"with name DynamoDBClientConfig cannot be used with value \"too-complex\" : " +
"Cannot invoke com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.withDynamoDBClientConfig on bean class " +
"'class com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration' - argument type mismatch - " +
"had objects of type \"java.lang.String\" but expected signature \"com.amazonaws.ClientConfiguration\"\n"
));
assertThat(ae.getMessage(), containsString("'shutdownGraceMillis' validated against 'not-long' is invalid because " +
"Kinesis Client Library Configuration property with name ShutdownGraceMillis " +
"cannot be used with value \"not-long\" : Value of ShutdownGraceMillis should be positive, but current value is 0\n"));
}
@Test
public void testValidDynamicKCLProperties() {
runner.setProperty("billingMode", "PROVISIONED"); // enum
runner.setProperty("idleMillisBetweenCalls", "1000"); // long
runner.setProperty("cleanupLeasesUponShardCompletion", "true"); // boolean
runner.setProperty("initialLeaseTableReadCapacity", "1"); // int
runner.setProperty("DataFetchingStrategy", "DEFAULT"); // String with uppercase leading character in property name
runner.assertValid();
}
/*
* Trigger a run of the ConsumeKinesisStream processor, but expect the KCL Worker to fail (it needs connections to AWS resources)
* Assert that our code is being called by checking log output. The ITConsumeKinesisStream integration tests prove actual AWS connectivity
*/
private void runWorker(final boolean withCredentials, final boolean waitForFailure) throws UnknownHostException, InitializationException, InterruptedException {
final TestRunner mockConsumeKinesisStreamRunner = TestRunners.newTestRunner(MockConsumeKinesisStream.class);
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.KINESIS_STREAM_NAME, "test-stream");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, "test-application");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.REGION, Regions.EU_WEST_2.getName());
final AWSCredentialsProviderService awsCredentialsProviderService = new AWSCredentialsProviderControllerService();
mockConsumeKinesisStreamRunner.addControllerService("aws-credentials", awsCredentialsProviderService);
if (withCredentials) {
mockConsumeKinesisStreamRunner.setProperty(awsCredentialsProviderService, CredentialPropertyDescriptors.ACCESS_KEY, "test-access");
mockConsumeKinesisStreamRunner.setProperty(awsCredentialsProviderService, CredentialPropertyDescriptors.SECRET_KEY, "test-secret");
} else {
mockConsumeKinesisStreamRunner.setProperty(awsCredentialsProviderService, CredentialPropertyDescriptors.USE_ANONYMOUS_CREDENTIALS, "true");
}
mockConsumeKinesisStreamRunner.assertValid(awsCredentialsProviderService);
mockConsumeKinesisStreamRunner.enableControllerService(awsCredentialsProviderService);
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE, "aws-credentials");
// speed up init process for the unit test (and show use of dynamic properties to configure KCL)
mockConsumeKinesisStreamRunner.setProperty("parentShardPollIntervalMillis", "1");
mockConsumeKinesisStreamRunner.assertValid();
// start the processor (but don't auto-shutdown to give Worker initialisation a chance to progress)
mockConsumeKinesisStreamRunner.run(1, false);
final MockConsumeKinesisStream processor = ((MockConsumeKinesisStream) mockConsumeKinesisStreamRunner.getProcessor());
// WorkerState should get to INITIALIZING pretty quickly, but there's a change it will still be at CREATED by the time we get here
assertThat(processor.workerState.get(), anyOf(equalTo(WorkerStateChangeListener.WorkerState.INITIALIZING), equalTo(WorkerStateChangeListener.WorkerState.CREATED)));
final String hostname = InetAddress.getLocalHost().getCanonicalHostName();
assertKinesisClientLibConfiguration(processor.kinesisClientLibConfiguration, withCredentials, hostname);
assertThat(processor.workerBuilder.build().getApplicationName(), equalTo("test-application"));
// confirm the Kinesis Worker initialisation was attempted
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().contains(String.format(
"Kinesis Worker prepared for application %s to process stream %s as worker ID %s:",
"test-application", "test-stream", hostname
))), is(true));
// confirm the processor worked through the onTrigger method (and no execution of stopConsuming method)
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().contains(String.format("Starting Kinesis Worker %s", hostname))), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.noneMatch(logMessage -> logMessage.getMsg().endsWith("Requesting Kinesis Worker shutdown")), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.noneMatch(logMessage -> logMessage.getMsg().endsWith("Kinesis Worker shutdown")), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getWarnMessages().isEmpty(), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getErrorMessages().isEmpty(), is(true));
if (!waitForFailure) {
// re-trigger the processor to ensure the Worker isn't re-initialised when already running
mockConsumeKinesisStreamRunner.run(1, false, false);
assertTrue(((MockProcessContext) mockConsumeKinesisStreamRunner.getProcessContext()).isYieldCalled());
// "Starting" log count remains at 1 from the initial startup above (the Logger doesn't get reset between processor calls)
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.filter(logMessage -> logMessage.getMsg().contains(String.format("Starting Kinesis Worker %s", hostname))).count(), is(1L));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getWarnMessages().isEmpty(), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getErrorMessages().isEmpty(), is(true));
// stop the processor
mockConsumeKinesisStreamRunner.stop();
// confirm the processor worked through the stopConsuming method
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith("Requesting Kinesis Worker shutdown")), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getInfoMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith("Kinesis Worker shutdown")), is(true));
// LeaseCoordinator doesn't startup properly (can't create DynamoDB table during unit test) and therefore has a problem during shutdown
assertThat(mockConsumeKinesisStreamRunner.getLogger().getWarnMessages().size(), is(2));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getWarnMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith(
"Problem while shutting down Kinesis Worker: java.lang.NullPointerException: java.util.concurrent.ExecutionException: java.lang.NullPointerException"
)), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getWarnMessages().stream()
.anyMatch(logMessage -> logMessage.getMsg().endsWith("One or more problems while shutting down Kinesis Worker, see logs for details")), is(true));
assertThat(mockConsumeKinesisStreamRunner.getLogger().getErrorMessages().isEmpty(), is(true));
} else {
for (int runs = 0; runs < 10; runs++) {
try {
mockConsumeKinesisStreamRunner.run(1, false, false);
Thread.sleep(1_000);
} catch (AssertionError e) {
assertThat(e.getCause(), instanceOf(ProcessException.class));
assertThat(e.getCause().getMessage(), equalTo("Worker has shutdown unexpectedly, possibly due to a configuration issue; check logs for details"));
assertTrue(((MockProcessContext) mockConsumeKinesisStreamRunner.getProcessContext()).isYieldCalled());
break;
}
}
}
}
private void assertKinesisClientLibConfiguration(final KinesisClientLibConfiguration kinesisClientLibConfiguration,
final boolean withCredentials, final String hostname) {
assertThat(kinesisClientLibConfiguration.getWorkerIdentifier(), startsWith(hostname));
assertThat(kinesisClientLibConfiguration.getApplicationName(), equalTo("test-application"));
assertThat(kinesisClientLibConfiguration.getStreamName(), equalTo("test-stream"));
if (withCredentials) {
assertThat(kinesisClientLibConfiguration.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId(), equalTo("test-access"));
assertThat(kinesisClientLibConfiguration.getKinesisCredentialsProvider().getCredentials().getAWSSecretKey(), equalTo("test-secret"));
assertThat(kinesisClientLibConfiguration.getDynamoDBCredentialsProvider().getCredentials().getAWSAccessKeyId(), equalTo("test-access"));
assertThat(kinesisClientLibConfiguration.getDynamoDBCredentialsProvider().getCredentials().getAWSSecretKey(), equalTo("test-secret"));
assertThat(kinesisClientLibConfiguration.getCloudWatchCredentialsProvider().getCredentials().getAWSAccessKeyId(), equalTo("test-access"));
assertThat(kinesisClientLibConfiguration.getCloudWatchCredentialsProvider().getCredentials().getAWSSecretKey(), equalTo("test-secret"));
} else {
assertThat(kinesisClientLibConfiguration.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId(), nullValue());
assertThat(kinesisClientLibConfiguration.getKinesisCredentialsProvider().getCredentials().getAWSSecretKey(), nullValue());
assertThat(kinesisClientLibConfiguration.getDynamoDBCredentialsProvider().getCredentials().getAWSAccessKeyId(), nullValue());
assertThat(kinesisClientLibConfiguration.getDynamoDBCredentialsProvider().getCredentials().getAWSSecretKey(), nullValue());
assertThat(kinesisClientLibConfiguration.getCloudWatchCredentialsProvider().getCredentials().getAWSAccessKeyId(), nullValue());
assertThat(kinesisClientLibConfiguration.getCloudWatchCredentialsProvider().getCredentials().getAWSSecretKey(), nullValue());
}
assertThat(kinesisClientLibConfiguration.getRegionName(), equalTo(Regions.EU_WEST_2.getName()));
assertThat(kinesisClientLibConfiguration.getInitialPositionInStream(), equalTo(InitialPositionInStream.LATEST));
assertThat(kinesisClientLibConfiguration.getDynamoDBEndpoint(), nullValue());
assertThat(kinesisClientLibConfiguration.getKinesisEndpoint(), nullValue());
assertThat(kinesisClientLibConfiguration.getKinesisClientConfiguration(), instanceOf(ClientConfiguration.class));
assertThat(kinesisClientLibConfiguration.getDynamoDBClientConfiguration(), instanceOf(ClientConfiguration.class));
assertThat(kinesisClientLibConfiguration.getCloudWatchClientConfiguration(), instanceOf(ClientConfiguration.class));
assertThat(kinesisClientLibConfiguration.getParentShardPollIntervalMillis(), equalTo(1L));
}
// public so TestRunners is able to see and instantiate the class for the tests
public static class MockConsumeKinesisStream extends ConsumeKinesisStream {
// capture the WorkerBuilder and KinesisClientLibConfiguration for unit test assertions
KinesisClientLibConfiguration kinesisClientLibConfiguration;
Worker.Builder workerBuilder;
@Override
Worker.Builder prepareWorkerBuilder(final ProcessContext context, final KinesisClientLibConfiguration kinesisClientLibConfiguration,
final IRecordProcessorFactory factory) {
workerBuilder = super.prepareWorkerBuilder(context, kinesisClientLibConfiguration, factory);
return workerBuilder;
}
@Override
KinesisClientLibConfiguration prepareKinesisClientLibConfiguration(final ProcessContext context, final String workerId) {
kinesisClientLibConfiguration = super.prepareKinesisClientLibConfiguration(context, workerId);
return kinesisClientLibConfiguration;
}
}
}

View File

@ -16,11 +16,6 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
import static org.junit.Assert.assertNotNull;
import java.util.List;
import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -28,6 +23,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertNotNull;
public class TestPutKinesisStream {
private TestRunner runner;
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";

View File

@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestAbstractKinesisRecordProcessor {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
private final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
@Mock
private ProcessSessionFactory processSessionFactory;
private final MockProcessSession session = new MockProcessSession(new SharedSessionState(runner.getProcessor(), new AtomicLong(0)), runner.getProcessor());
private AbstractKinesisRecordProcessor fixture;
@Mock
private IRecordProcessorCheckpointer checkpointer;
@Mock
private Record kinesisRecord;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(processSessionFactory.createSession()).thenReturn(session);
// default test fixture will try operations twice with very little wait in between
fixture = new AbstractKinesisRecordProcessor(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER) {
@Override
void processRecord(List<FlowFile> flowFiles, Record kinesisRecord, boolean lastRecord, ProcessSession session, StopWatch stopWatch) {
// intentionally blank
}
};
}
@After
public void tearDown() {
verifyNoMoreInteractions(checkpointer, kinesisRecord, processSessionFactory);
reset(checkpointer, kinesisRecord, processSessionFactory);
}
@Test
public void testInitialisation() {
final ExtendedSequenceNumber esn = new ExtendedSequenceNumber(InitialPositionInStream.AT_TIMESTAMP.toString(), 123L);
final InitializationInput initializationInput = new InitializationInput()
.withExtendedSequenceNumber(esn)
.withShardId("shard-id");
fixture.initialize(initializationInput);
assertThat(fixture.getNextCheckpointTimeInMillis() > System.currentTimeMillis(), is(true));
assertThat(fixture.getKinesisShardId(), equalTo("shard-id"));
// DEBUG messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Initializing record processor for stream: {} / shard: {}; from sequence number: {}")), is(true));
}
@Test
public void testInitialisationWithPendingCheckpoint() {
final ExtendedSequenceNumber esn = new ExtendedSequenceNumber(InitialPositionInStream.AT_TIMESTAMP.toString(), 123L);
final ExtendedSequenceNumber prev = new ExtendedSequenceNumber(InitialPositionInStream.LATEST.toString(), 456L);
final InitializationInput initializationInput = new InitializationInput()
.withExtendedSequenceNumber(esn)
.withPendingCheckpointSequenceNumber(prev)
.withShardId("shard-id");
fixture.initialize(initializationInput);
assertThat(fixture.getNextCheckpointTimeInMillis() > System.currentTimeMillis(), is(true));
assertThat(fixture.getKinesisShardId(), equalTo("shard-id"));
assertThat(runner.getLogger().getWarnMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.contains(String.format(
"Initializing record processor for stream: %s / shard %s; from sequence number: %s; indicates previously uncheckpointed sequence number: %s",
"kinesis-test", "shard-id", esn, prev
))), is(true));
}
@Test
public void testShutdown() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.REQUESTED)
.withCheckpointer(checkpointer);
fixture.setKinesisShardId("test-shard");
fixture.shutdown(shutdownInput);
verify(checkpointer, times(1)).checkpoint();
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Checkpointing shard test-shard")), is(true));
}
@Test
public void testShutdownWithThrottlingFailures() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.REQUESTED)
.withCheckpointer(checkpointer);
doThrow(new ThrottlingException("throttled")).when(checkpointer).checkpoint();
fixture.shutdown(shutdownInput);
verify(checkpointer, times(2)).checkpoint();
assertThat(runner.getLogger().getWarnMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith(String.format(
"Transient issue when checkpointing - attempt %d of %d: %s: %s",
1, 2, ThrottlingException.class.getName(), "throttled"
))), is(true));
// ERROR messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getErrorMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Checkpoint failed after {} attempts.: {}")), is(true));
}
@Test
public void testShutdownWithShutdownFailure() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.REQUESTED)
.withCheckpointer(checkpointer);
doThrow(new ShutdownException("shutdown")).when(checkpointer).checkpoint();
fixture.shutdown(shutdownInput);
verify(checkpointer, times(1)).checkpoint();
assertThat(runner.getLogger().getInfoMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Caught shutdown exception, skipping checkpoint.")), is(true));
}
@Test
public void testShutdownWithInvalidStateFailure() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.ZOMBIE)
.withCheckpointer(checkpointer);
doThrow(new InvalidStateException("invalid state")).when(checkpointer).checkpoint();
fixture.shutdown(shutdownInput);
verify(checkpointer, times(1)).checkpoint();
assertThat(runner.getLogger().getErrorMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.")), is(true));
}
@Test
public void testShutdownTerminateRecordsNotProcessing() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(checkpointer);
fixture.setKinesisShardId("test-shard");
fixture.setProcessingRecords(false);
fixture.shutdown(shutdownInput);
verify(checkpointer, times(1)).checkpoint();
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Checkpointing shard test-shard")), is(true));
// DEBUG messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Shutting down Record Processor for shard: {} with reason: {}")), is(true));
// no waiting loop when records aren't processing
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Record Processor for shard {} still processing records, waiting before shutdown")), is(false));
assertThat(runner.getLogger().getWarnMessages().size(), is(0));
}
@Test
public void testShutdownTerminateRecordsProcessing() throws InvalidStateException, ShutdownException {
final ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(checkpointer);
fixture.setKinesisShardId("test-shard");
fixture.setProcessingRecords(true);
fixture.shutdown(shutdownInput);
verify(checkpointer, times(1)).checkpoint();
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Checkpointing shard test-shard")), is(true));
// DEBUG messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getDebugMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Shutting down Record Processor for shard: {} with reason: {}")), is(true));
// wait loop when records are processing
assertThat(runner.getLogger().getDebugMessages().stream().filter(logMessage -> logMessage.getMsg()
.endsWith("Record Processor for shard {} still processing records, waiting before shutdown"))
.count(), is(2L));
assertThat(runner.getLogger().getWarnMessages().stream().filter(logMessage -> logMessage.getMsg()
.endsWith("Record Processor for shard test-shard still running, but maximum wait time elapsed, checkpoint will be attempted"))
.count(), is(1L));
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestKinesisRecordProcessorRaw {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
private final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
@Mock
private ProcessSessionFactory processSessionFactory;
private final SharedSessionState sharedState = new SharedSessionState(runner.getProcessor(), new AtomicLong(0));
private final MockProcessSession session = new MockProcessSession(sharedState, runner.getProcessor());
private AbstractKinesisRecordProcessor fixture;
@Mock
private IRecordProcessorCheckpointer checkpointer;
@Mock
private Record kinesisRecord;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
// default test fixture will try operations twice with very little wait in between
fixture = new KinesisRecordProcessorRaw(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER);
}
@After
public void tearDown() {
verifyNoMoreInteractions(checkpointer, kinesisRecord, processSessionFactory);
reset(checkpointer, kinesisRecord, processSessionFactory);
}
@Test
public void testProcessRecordsEmpty() {
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Collections.emptyList())
.withCheckpointer(checkpointer)
.withCacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.withCacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
.withMillisBehindLatest(100L);
// would checkpoint (but should skip because there are no records processed)
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() - 10_000L);
fixture.processRecords(processRecordsInput);
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 0);
assertThat(sharedState.getProvenanceEvents().size(), is(0));
session.assertNotCommitted();
session.assertNotRolledBack();
}
@Test
public void testProcessRecordsNoCheckpoint() {
processMultipleRecordsAssertProvenance(false);
}
@Test
public void testProcessRecordsWithEndpointOverride() {
processMultipleRecordsAssertProvenance(true);
}
private void processMultipleRecordsAssertProvenance(final boolean endpointOverridden) {
final Date firstDate = Date.from(Instant.now().minus(1, ChronoUnit.MINUTES));
final Date secondDate = new Date();
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Arrays.asList(
new Record().withApproximateArrivalTimestamp(firstDate)
.withPartitionKey("partition-1")
.withSequenceNumber("1")
.withData(ByteBuffer.wrap("record-1".getBytes(StandardCharsets.UTF_8))),
new Record().withApproximateArrivalTimestamp(secondDate)
.withPartitionKey("partition-2")
.withSequenceNumber("2")
.withData(ByteBuffer.wrap("record-2".getBytes(StandardCharsets.UTF_8))),
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-no-date")
.withSequenceNumber("no-date")
.withData(ByteBuffer.wrap("record-no-date".getBytes(StandardCharsets.UTF_8)))
))
.withCheckpointer(checkpointer)
.withCacheEntryTime(null)
.withCacheExitTime(null)
.withMillisBehindLatest(null);
final String transitUriPrefix = endpointOverridden ? "https://another-endpoint.com:8443" : "http://endpoint-prefix.amazonaws.com";
if (endpointOverridden) {
fixture = new KinesisRecordProcessorRaw(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", "https://another-endpoint.com:8443", 10_000L, 1L, 2, DATE_TIME_FORMATTER);
}
// skip checkpoint
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() + 10_000L);
fixture.setKinesisShardId("test-shard");
when(processSessionFactory.createSession()).thenReturn(session);
fixture.processRecords(processRecordsInput);
verify(processSessionFactory, times(1)).createSession();
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, processRecordsInput.getRecords().size());
assertThat(sharedState.getProvenanceEvents().size(), is(processRecordsInput.getRecords().size()));
assertThat(sharedState.getProvenanceEvents().get(0).getTransitUri(), is(String.format("%s/test-shard/partition-1#1", transitUriPrefix)));
assertThat(sharedState.getProvenanceEvents().get(1).getTransitUri(), is(String.format("%s/test-shard/partition-2#2", transitUriPrefix)));
assertThat(sharedState.getProvenanceEvents().get(2).getTransitUri(), is(String.format("%s/test-shard/partition-no-date#no-date", transitUriPrefix)));
final List<MockFlowFile> flowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
assertFlowFile(flowFiles.get(0), firstDate, "partition-1", "1", "record-1");
assertFlowFile(flowFiles.get(1), secondDate, "partition-2", "2", "record-2");
assertFlowFile(flowFiles.get(2), null, "partition-no-date", "no-date", "record-no-date");
session.assertCommitted();
session.assertNotRolledBack();
}
@Test
public void testProcessPoisonPillRecordWithCheckpoint() throws ShutdownException, InvalidStateException {
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Arrays.asList(
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-1")
.withSequenceNumber("1")
.withData(ByteBuffer.wrap("record-1".getBytes(StandardCharsets.UTF_8))),
kinesisRecord,
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-3")
.withSequenceNumber("3")
.withData(ByteBuffer.wrap("record-3".getBytes(StandardCharsets.UTF_8)))
))
.withCheckpointer(checkpointer)
.withCacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.withCacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
.withMillisBehindLatest(100L);
when(kinesisRecord.getData()).thenThrow(new IllegalStateException("illegal state"));
when(kinesisRecord.toString()).thenReturn("poison-pill");
fixture.setKinesisShardId("test-shard");
when(processSessionFactory.createSession()).thenReturn(session);
fixture.processRecords(processRecordsInput);
verify(processSessionFactory, times(1)).createSession();
// check non-poison pill records are output successfully
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 2);
final List<MockFlowFile> flowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
assertFlowFile(flowFiles.get(0), null, "partition-1", "1", "record-1");
assertFlowFile(flowFiles.get(1), null, "partition-3", "3", "record-3");
// check the "poison pill" record was retried a 2nd time
assertNull(verify(kinesisRecord, times(2)).getPartitionKey());
assertNull(verify(kinesisRecord, times(2)).getSequenceNumber());
assertNull(verify(kinesisRecord, times(2)).getApproximateArrivalTimestamp());
assertNull(verify(kinesisRecord, times(2)).getData());
verify(checkpointer, times(1)).checkpoint();
// ERROR messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getErrorMessages().stream().filter(logMessage -> logMessage.getMsg()
.endsWith("Caught Exception while processing Kinesis record {}: {}"))
.count(), is(2L));
assertThat(runner.getLogger().getErrorMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Couldn't process Kinesis record {}, skipping.")), is(true));
session.assertCommitted();
session.assertNotRolledBack();
}
private void assertFlowFile(final MockFlowFile flowFile, final Date approxTimestamp, final String partitionKey,
final String sequenceNumber, final String content) {
if (approxTimestamp != null) {
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP,
DATE_TIME_FORMATTER.format(ZonedDateTime.ofInstant(approxTimestamp.toInstant(), ZoneId.systemDefault())));
} else {
flowFile.assertAttributeNotExists(AbstractKinesisRecordProcessor.AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP);
}
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_PARTITION_KEY, partitionKey);
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_SEQUENCE_NUMBER, sequenceNumber);
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_SHARD_ID, "test-shard");
flowFile.assertContentEquals(content);
}
}

View File

@ -0,0 +1,334 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestKinesisRecordProcessorRecord {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
private final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesisStream.class);
@Mock
private ProcessSessionFactory processSessionFactory;
private final SharedSessionState sharedState = new SharedSessionState(runner.getProcessor(), new AtomicLong(0));
private final MockProcessSession session = new MockProcessSession(sharedState, runner.getProcessor());
private AbstractKinesisRecordProcessor fixture;
private final RecordReaderFactory reader = new JsonTreeReader();
private final RecordSetWriterFactory writer = new JsonRecordSetWriter();
@Mock
private IRecordProcessorCheckpointer checkpointer;
@Mock
private Record kinesisRecord;
@Before
public void setUp() throws InitializationException {
MockitoAnnotations.initMocks(this);
runner.addControllerService("record-reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
runner.enableControllerService(reader);
runner.setProperty(ConsumeKinesisStream.RECORD_READER, "record-reader");
runner.addControllerService("record-writer", writer);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA.getValue());
runner.setProperty(writer, "output-grouping", "output-oneline");
runner.enableControllerService(writer);
runner.setProperty(ConsumeKinesisStream.RECORD_WRITER, "record-writer");
// default test fixture will try operations twice with very little wait in between
fixture = new KinesisRecordProcessorRecord(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
reader, writer);
}
@After
public void tearDown() {
verifyNoMoreInteractions(checkpointer, kinesisRecord, processSessionFactory);
reset(checkpointer, kinesisRecord, processSessionFactory);
}
@Test
public void testProcessRecordsEmpty() {
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Collections.emptyList())
.withCheckpointer(checkpointer)
.withCacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.withCacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
.withMillisBehindLatest(100L);
// would checkpoint (but should skip because there are no records processed)
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() - 10_000L);
fixture.processRecords(processRecordsInput);
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 0);
assertThat(sharedState.getProvenanceEvents().size(), is(0));
session.assertNotCommitted();
session.assertNotRolledBack();
}
@Test
public void testProcessRecordsNoCheckpoint() {
processMultipleRecordsAssertProvenance(false);
}
@Test
public void testProcessRecordsWithEndpointOverride() {
processMultipleRecordsAssertProvenance(true);
}
private void processMultipleRecordsAssertProvenance(final boolean endpointOverridden) {
final Date firstDate = Date.from(Instant.now().minus(1, ChronoUnit.MINUTES));
final Date secondDate = new Date();
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Arrays.asList(
new Record().withApproximateArrivalTimestamp(firstDate)
.withPartitionKey("partition-1")
.withSequenceNumber("1")
.withData(ByteBuffer.wrap("{\"record\":\"1\"}\n{\"record\":\"1b\"}".getBytes(StandardCharsets.UTF_8))),
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-no-date")
.withSequenceNumber("no-date")
.withData(ByteBuffer.wrap("{\"record\":\"no-date\"}".getBytes(StandardCharsets.UTF_8))),
new Record().withApproximateArrivalTimestamp(secondDate)
.withPartitionKey("partition-2")
.withSequenceNumber("2")
.withData(ByteBuffer.wrap("{\"record\":\"2\"}".getBytes(StandardCharsets.UTF_8)))
))
.withCheckpointer(checkpointer)
.withCacheEntryTime(null)
.withCacheExitTime(null)
.withMillisBehindLatest(null);
final String transitUriPrefix = endpointOverridden ? "https://another-endpoint.com:8443" : "http://endpoint-prefix.amazonaws.com";
if (endpointOverridden) {
fixture = new KinesisRecordProcessorRecord(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", "https://another-endpoint.com:8443", 10_000L, 1L, 2, DATE_TIME_FORMATTER,
reader, writer);
}
// skip checkpoint
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() + 10_000L);
fixture.setKinesisShardId("another-shard");
when(processSessionFactory.createSession()).thenReturn(session);
fixture.processRecords(processRecordsInput);
verify(processSessionFactory, times(1)).createSession();
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
assertThat(sharedState.getProvenanceEvents().size(), is(1));
assertThat(sharedState.getProvenanceEvents().get(0).getTransitUri(), is(String.format("%s/another-shard", transitUriPrefix)));
final List<MockFlowFile> flowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 4 records in single output file, attributes equating to that of the last record
assertFlowFile(flowFiles.get(0), secondDate, "partition-2", "2", "another-shard", "{\"record\":\"1\"}\n" +
"{\"record\":\"1b\"}\n" +
"{\"record\":\"no-date\"}\n" +
"{\"record\":\"2\"}",4);
session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 0);
session.assertCommitted();
session.assertNotRolledBack();
}
@Test
public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint() throws ShutdownException, InvalidStateException {
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Arrays.asList(
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-1")
.withSequenceNumber("1")
.withData(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8))),
kinesisRecord,
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-3")
.withSequenceNumber("3")
.withData(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
))
.withCheckpointer(checkpointer)
.withCacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.withCacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
.withMillisBehindLatest(100L);
when(kinesisRecord.getData()).thenThrow(new IllegalStateException("illegal state"));
when(kinesisRecord.toString()).thenReturn("poison-pill");
fixture.setKinesisShardId("test-shard");
when(processSessionFactory.createSession()).thenReturn(session);
fixture.processRecords(processRecordsInput);
verify(processSessionFactory, times(1)).createSession();
// check non-poison pill records are output successfully
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 2 successful records in single output file, attributes equating to that of the last successful record
assertFlowFile(flowFiles.get(0), null, "partition-3", "3", "test-shard", "{\"record\":\"1\"}\n" +
"{\"record\":\"3\"}", 2);
// check no poison-pill output (as the raw data could not be retrieved)
session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 0);
// check the "poison pill" record was retried a 2nd time
assertNull(verify(kinesisRecord, times(2)).getData());
verify(checkpointer, times(1)).checkpoint();
// ERROR messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getErrorMessages().stream().filter(logMessage -> logMessage.getMsg()
.endsWith("Caught Exception while processing Kinesis record {}: {}"))
.count(), is(2L));
assertThat(runner.getLogger().getErrorMessages().stream().anyMatch(logMessage -> logMessage.getMsg()
.endsWith("Couldn't process Kinesis record {}, skipping.")), is(true));
session.assertCommitted();
session.assertNotRolledBack();
}
@Test
public void testProcessUnparsableRecordWithRawOutputWithCheckpoint() throws ShutdownException, InvalidStateException {
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(Arrays.asList(
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-1")
.withSequenceNumber("1")
.withData(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8))),
kinesisRecord,
new Record().withApproximateArrivalTimestamp(null)
.withPartitionKey("partition-3")
.withSequenceNumber("3")
.withData(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
))
.withCheckpointer(checkpointer)
.withCacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.withCacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
.withMillisBehindLatest(100L);
when(kinesisRecord.getData()).thenReturn(ByteBuffer.wrap("invalid-json".getBytes(StandardCharsets.UTF_8)));
when(kinesisRecord.getPartitionKey()).thenReturn("unparsable-partition");
when(kinesisRecord.getSequenceNumber()).thenReturn("unparsable-sequence");
when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(null);
fixture.setKinesisShardId("test-shard");
when(processSessionFactory.createSession()).thenReturn(session);
fixture.processRecords(processRecordsInput);
verify(processSessionFactory, times(1)).createSession();
// check non-poison pill records are output successfully
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 2 successful records in single output file, attributes equating to that of the last successful record
assertFlowFile(flowFiles.get(0), null, "partition-3", "3", "test-shard", "{\"record\":\"1\"}\n" +
"{\"record\":\"3\"}", 2);
// check poison-pill output (as the raw data could not be retrieved)
session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 1);
final List<MockFlowFile> failureFlowFiles = session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_PARSE_FAILURE);
assertFlowFile(failureFlowFiles.get(0), null, "unparsable-partition", "unparsable-sequence", "test-shard", "invalid-json", 0);
failureFlowFiles.get(0).assertAttributeExists("record.error.message");
// check the invalid json record was *not* retried a 2nd time
assertNull(verify(kinesisRecord, times(1)).getPartitionKey());
assertNull(verify(kinesisRecord, times(1)).getSequenceNumber());
assertNull(verify(kinesisRecord, times(1)).getApproximateArrivalTimestamp());
assertNull(verify(kinesisRecord, times(2)).getData());
verify(checkpointer, times(1)).checkpoint();
// ERROR messages don't have their fields replaced in the MockComponentLog
assertThat(runner.getLogger().getErrorMessages().stream().filter(logMessage -> logMessage.getMsg()
.endsWith("Failed to parse message from Kinesis Stream using configured Record Reader and Writer due to {}: {}"))
.count(), is(1L));
session.assertCommitted();
session.assertNotRolledBack();
}
private void assertFlowFile(final MockFlowFile flowFile, final Date approxTimestamp, final String partitionKey,
final String sequenceNumber, final String shard, final String content, final int recordCount) {
if (approxTimestamp != null) {
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP,
DATE_TIME_FORMATTER.format(ZonedDateTime.ofInstant(approxTimestamp.toInstant(), ZoneId.systemDefault())));
} else {
flowFile.assertAttributeNotExists(AbstractKinesisRecordProcessor.AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP);
}
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_PARTITION_KEY, partitionKey);
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_SEQUENCE_NUMBER, sequenceNumber);
flowFile.assertAttributeEquals(AbstractKinesisRecordProcessor.AWS_KINESIS_SHARD_ID, shard);
if (recordCount > 0) {
flowFile.assertAttributeEquals("record.count", String.valueOf(recordCount));
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
} else {
flowFile.assertAttributeNotExists("record.count");
flowFile.assertAttributeNotExists(CoreAttributes.MIME_TYPE.key());
}
flowFile.assertContentEquals(content);
}
}

View File

@ -26,7 +26,10 @@
<packaging>pom</packaging>
<properties>
<aws-java-sdk-version>1.11.880</aws-java-sdk-version>
<!-- keep AWS 1.x until NIFI-8287 -->
<aws-java-sdk-version>1.11.1016</aws-java-sdk-version>
<!-- Do not upgrade to 1.14.x+ until https://github.com/awslabs/amazon-kinesis-client/issues/796 is fixed -->
<aws-kinesis-client-library-version>1.13.3</aws-kinesis-client-library-version>
</properties>
<dependencyManagement>

View File

@ -128,6 +128,9 @@
<!-- Suppress non-error messages from SMBJ which was emitting large amounts of INFO logs by default -->
<logger name="com.hierynomus.smbj" level="WARN" />
<!-- Suppress non-error messages from AWS KCL which was emitting large amounts of INFO logs by default -->
<logger name="com.amazonaws.services.kinesis" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>

View File

@ -62,6 +62,9 @@
<!-- Suppress non-error messages from SMBJ which was emitting large amounts of INFO logs by default -->
<logger name="com.hierynomus.smbj" level="WARN" />
<!-- Suppress non-error messages from AWS KCL which was emitting large amounts of INFO logs by default -->
<logger name="com.amazonaws.services.kinesis" level="WARN" />
<logger name="org.apache.nifi.stateless" level="INFO" additivity="false">
<appender-ref ref="CONSOLE" />
<appender-ref ref="APP_FILE" />