JAVA-15980 Upgrade AWS Kinesis (#13294)

* JAVA-15980 Upgrade AWS Kinesis

* JAVA-15980 Minor readability improvement
This commit is contained in:
Anastasios Ioannidis 2023-02-03 19:47:48 +02:00 committed by GitHub
parent bf69ecf4bd
commit 456ff7d029
8 changed files with 71 additions and 71 deletions

View File

@ -36,7 +36,7 @@
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version> <version>1.14.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
@ -46,9 +46,9 @@
</dependencies> </dependencies>
<properties> <properties>
<aws-sdk.version>1.11.632</aws-sdk.version> <aws-sdk.version>1.12.380</aws-sdk.version>
<spring-cloud-stream-kinesis-binder.version>2.0.2.RELEASE</spring-cloud-stream-kinesis-binder.version> <spring-cloud-stream-kinesis-binder.version>2.2.0</spring-cloud-stream-kinesis-binder.version>
<spring-cloud-stream-test.version>2.2.1.RELEASE</spring-cloud-stream-test.version> <spring-cloud-stream-test.version>4.0.0</spring-cloud-stream-test.version>
</properties> </properties>
</project> </project>

View File

@ -1,16 +1,16 @@
package com.baeldung.binder; package com.baeldung.binder;
import org.springframework.cloud.stream.annotation.EnableBinding; import java.util.function.Consumer;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component import org.springframework.context.annotation.Bean;
@EnableBinding(Sink.class) import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerBinder { public class ConsumerBinder {
@Bean
@StreamListener(Sink.INPUT) Consumer<String> input() {
public void consume(String ip) { return str -> {
System.out.println(ip); System.out.println(str);
} };
}
} }

View File

@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication @SpringBootApplication
public class KinesisBinderApplication { public class KinesisBinderApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(KinesisBinderApplication.class, args); SpringApplication.run(KinesisBinderApplication.class, args);
} }
} }

View File

@ -1,24 +1,20 @@
package com.baeldung.binder; package com.baeldung.binder;
import java.util.function.Supplier;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean;
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component @Configuration
@EnableBinding(Source.class) class ProducerBinder {
public class ProducerBinder {
@Autowired @Bean
private Source source; public Supplier output() {
return () -> IntStream.range(1, 200)
@Scheduled(fixedDelay = 3000L) .mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
private void produce() { .map(entry -> MessageBuilder.withPayload(entry)
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) .build());
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
} }
} }

View File

@ -1,11 +1,14 @@
package com.baeldung.kclkpl; package com.baeldung.kclkpl;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.*;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions; import com.amazonaws.regions.Regions;
@ -17,32 +20,30 @@ public class KinesisKCLApplication implements ApplicationRunner {
@Value("${aws.access.key}") @Value("${aws.access.key}")
private String accessKey; private String accessKey;
@Value("${aws.secret.key}") @Value("${aws.secret.key}")
private String secretKey; private String secretKey;
@Value("${ips.stream}") @Value("${ips.stream}")
private String IPS_STREAM; private String IPS_STREAM;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(KinesisKCLApplication.class, args); SpringApplication.run(KinesisKCLApplication.class, args);
} }
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
"KinesisKCLConsumer",
IPS_STREAM,
new AWSStaticCredentialsProvider(awsCredentials),
"KinesisKCLConsumer")
.withRegionName(Regions.EU_CENTRAL_1.getName());
new Worker.Builder() KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration("KinesisKCLConsumer", IPS_STREAM, "", "", DEFAULT_INITIAL_POSITION_IN_STREAM, new AWSStaticCredentialsProvider(awsCredentials),
.recordProcessorFactory(new IpProcessorFactory()) new AWSStaticCredentialsProvider(awsCredentials), new AWSStaticCredentialsProvider(awsCredentials), DEFAULT_FAILOVER_TIME_MILLIS, "KinesisKCLConsumer", DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
.config(consumerConfig) DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, new ClientConfiguration(), new ClientConfiguration(),
.build() new ClientConfiguration(), DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, Regions.EU_CENTRAL_1.getName(), DEFAULT_SHUTDOWN_GRACE_MILLIS,
.run(); DEFAULT_DDB_BILLING_MODE, null, 0, 0, 0);
}
new Worker.Builder().recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build()
.run();
}
} }

View File

@ -16,23 +16,22 @@ public class KinesisKPLApplication {
@Value("${aws.access.key}") @Value("${aws.access.key}")
private String accessKey; private String accessKey;
@Value("${aws.secret.key}") @Value("${aws.secret.key}")
private String secretKey; private String secretKey;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(KinesisKPLApplication.class, args); SpringApplication.run(KinesisKPLApplication.class, args);
} }
@Bean @Bean
public KinesisProducer kinesisProducer() { public KinesisProducer kinesisProducer() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration() KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration().setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) .setVerifyCertificate(false)
.setVerifyCertificate(false) .setRegion(Regions.EU_CENTRAL_1.getName());
.setRegion(Regions.EU_CENTRAL_1.getName());
return new KinesisProducer(producerConfig); return new KinesisProducer(producerConfig);
} }
} }

View File

@ -12,9 +12,11 @@ cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1 cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false cloud.aws.stack.auto=false
spring.cloud.stream.bindings.input.destination=live-ips spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input
spring.cloud.stream.bindings.output.destination=myStream spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000

View File

@ -12,9 +12,11 @@ cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1 cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false cloud.aws.stack.auto=false
spring.cloud.stream.bindings.input.destination=live-ips spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input
spring.cloud.stream.bindings.output.destination=myStream spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000