NIFI-13487 ConsumeKinesisStream initial stream position handling error after AWS SDK 2.x migration

Signed-off-by: Joe Gresock <jgresock@gmail.com>
This closes #9035.
This commit is contained in:
Krisztina Zsihovszki 2024-07-01 14:54:49 +02:00 committed by Joe Gresock
parent e4014b5525
commit c73d1cf634
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
2 changed files with 11 additions and 9 deletions

View File

@ -75,6 +75,7 @@ import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;
@ -611,7 +612,6 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
synchronized Scheduler prepareScheduler(final ProcessContext context, final ProcessSessionFactory sessionFactory, final String schedulerId) {
final KinesisAsyncClient kinesisClient = getClient(context);
final ConfigsBuilder configsBuilder = prepareConfigsBuilder(context, schedulerId, sessionFactory);
final MetricsConfig metricsConfig = configsBuilder.metricsConfig();
if (!isReportCloudWatchMetrics(context)) {
metricsConfig.metricsFactory(new NullMetricsFactory());
@ -623,18 +623,13 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.streamName(streamName);
final CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig().workerStateChangeListener(workerState::set);
final InitialPositionInStream initialPositionInStream = getInitialPositionInStream(context);
final InitialPositionInStreamExtended initialPositionInStreamValue = (InitialPositionInStream.AT_TIMESTAMP == initialPositionInStream)
? InitialPositionInStreamExtended.newInitialPositionAtTimestamp(getStartStreamTimestamp(context))
: InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
leaseManagementConfig.initialPositionInStream(initialPositionInStreamValue);
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
final RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
@ -715,8 +710,13 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
*/
@VisibleForTesting
ConfigsBuilder prepareConfigsBuilder(final ProcessContext context, final String workerId, final ProcessSessionFactory sessionFactory) {
final InitialPositionInStream initialPositionInStream = getInitialPositionInStream(context);
final InitialPositionInStreamExtended initialPositionInStreamValue = (InitialPositionInStream.AT_TIMESTAMP == initialPositionInStream)
? InitialPositionInStreamExtended.newInitialPositionAtTimestamp(getStartStreamTimestamp(context))
: InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
return new ConfigsBuilder(
getStreamName(context),
new SingleStreamTracker(getStreamName(context), initialPositionInStreamValue),
getApplicationName(context),
getClient(context),
getDynamoClient(context),

View File

@ -269,6 +269,7 @@ public class TestConsumeKinesisStream {
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, "test-application");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.REGION, Regions.EU_WEST_2.getName());
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.TIMEOUT, "5 secs");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, "TRIM_HORIZON");
final AWSCredentialsProviderService awsCredentialsProviderService = new AWSCredentialsProviderControllerService();
mockConsumeKinesisStreamRunner.addControllerService("aws-credentials", awsCredentialsProviderService);
@ -326,7 +327,8 @@ public class TestConsumeKinesisStream {
assertTrue(scheduler.leaseManagementConfig().workerIdentifier().startsWith(hostname));
assertEquals(scheduler.coordinatorConfig().applicationName(), "test-application");
assertEquals(scheduler.leaseManagementConfig().streamName(), "test-stream");
assertEquals(scheduler.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.LATEST);
assertEquals(scheduler.retrievalConfig().streamTracker().streamConfigList().get(0).initialPositionInStreamExtended().getInitialPositionInStream(),
InitialPositionInStream.TRIM_HORIZON );
assertEquals(scheduler.coordinatorConfig().parentShardPollIntervalMillis(), 1);
}