diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java index 7549ca9c83..949787b687 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java @@ -22,13 +22,15 @@ public class IpConsumer { @Value("${ips.stream}") private String IPS_STREAM; + @Value("${ips.shard.id}") private String IPS_SHARD_ID; + @Autowired private AmazonKinesis kinesis; + private GetShardIteratorResult shardIterator; - @StreamListener(Sink.INPUT) public void consume(String ip) { System.out.println(ip); @@ -40,8 +42,7 @@ public class IpConsumer { recordsRequest.setLimit(25); GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest); - while (!recordsResult.getRecords() - .isEmpty()) { + while (!recordsResult.getRecords().isEmpty()) { recordsResult.getRecords() .stream() .map(record -> new String(record.getData() diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java index 3ba8f8e4b4..f59b2161f9 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java @@ -23,6 +23,7 @@ public class IpProducer { @Value("${ips.partition.key}") private String IPS_PARTITION_KEY; + @Value("${ips.stream}") private String IPS_STREAM; diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java index b748446600..6926560244 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java @@ -1,7 +1,5 @@ package com.baeldung; -import javax.annotation.PostConstruct; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; @@ -24,8 +22,10 @@ public class KinesisApplication { @Value("${aws.access.key}") private String accessKey; + @Value("${aws.secret.key}") private String secretKey; + @Autowired private Processor processor; @@ -33,7 +33,6 @@ public class KinesisApplication { SpringApplication.run(KinesisApplication.class, args); } - @Bean public AmazonKinesis buildAmazonKinesis() { BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties index 32ab4da914..cc1f321453 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties @@ -1,15 +1,17 @@ -cloud.aws.credentials.access-key=aws-key -cloud.aws.credentials.secret-key=aws-secret -cloud.aws.region.static=eu-central-1 -cloud.aws.stack.auto=false - aws.access.key=my-aws-access-key-goes-here aws.secret.key=my-aws-secret-key-goes-here -spring.cloud.stream.bindings.output.destination=myStream +cloud.aws.credentials.access-key=my-aws-access-key +cloud.aws.credentials.secret-key=my-aws-secret-key +cloud.aws.region.static=eu-central-1 +cloud.aws.stack.auto=false + spring.cloud.stream.bindings.input.destination=live-ips spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input.content-type=text/plain +spring.cloud.stream.bindings.output.destination=myStream +spring.cloud.stream.bindings.output.content-type=text/plain + ips.partition.key=ips-partition-key ips.stream=ips-stream \ No newline at end of file