diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java new file mode 100644 index 0000000000..7549ca9c83 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java @@ -0,0 +1,64 @@ +package com.baeldung; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.stereotype.Component; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ShardIteratorType; + +@Component +@EnableBinding(Sink.class) +public class IpConsumer { + + @Value("${ips.stream}") + private String IPS_STREAM; + @Value("${ips.shard.id}") + private String IPS_SHARD_ID; + @Autowired + private AmazonKinesis kinesis; + private GetShardIteratorResult shardIterator; + + + @StreamListener(Sink.INPUT) + public void consume(String ip) { + System.out.println(ip); + } + + private void consumeWithKinesis() { + GetRecordsRequest recordsRequest = new GetRecordsRequest(); + recordsRequest.setShardIterator(shardIterator.getShardIterator()); + recordsRequest.setLimit(25); + + GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest); + while (!recordsResult.getRecords() + .isEmpty()) { + recordsResult.getRecords() + .stream() + .map(record -> new String(record.getData() + .array())) + .forEach(System.out::println); + + recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); + recordsResult = kinesis.getRecords(recordsRequest); + } + } + + @PostConstruct + private void buildShardIterator() { + GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest(); + readShardsRequest.setStreamName(IPS_STREAM); + readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST); + readShardsRequest.setShardId(IPS_SHARD_ID); + this.shardIterator = kinesis.getShardIterator(readShardsRequest); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java new file mode 100644 index 0000000000..32e6babc86 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java @@ -0,0 +1,20 @@ +package com.baeldung; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; + +public class IpProcessor implements IRecordProcessor { + @Override + public void initialize(InitializationInput initializationInput) { } + + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + processRecordsInput.getRecords() + .forEach(record -> System.out.println(new String(record.getData().array()))); + } + + @Override + public void shutdown(ShutdownInput shutdownInput) { } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java new file mode 100644 index 0000000000..1ca774bb39 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java @@ -0,0 +1,11 @@ +package com.baeldung; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; + +public class IpProcessorFactory implements IRecordProcessorFactory { + @Override + public IRecordProcessor createProcessor() { + return new IpProcessor(); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java new file mode 100644 index 0000000000..3ba8f8e4b4 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java @@ -0,0 +1,55 @@ +package com.baeldung; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; + +@Component +@EnableBinding(Source.class) +public class IpProducer { + + @Value("${ips.partition.key}") + private String IPS_PARTITION_KEY; + @Value("${ips.stream}") + private String IPS_STREAM; + + @Autowired + private Source source; + @Autowired + private AmazonKinesis kinesis; + + @Scheduled(fixedDelay = 3000L) + private void produce() { + IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) + .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); + } + + @Scheduled(fixedDelay = 3000L) + private void produceWithKinesis() { + List entries = IntStream.range(1, 200).mapToObj(ipSuffix -> { + PutRecordsRequestEntry entry = new PutRecordsRequestEntry(); + entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())); + entry.setPartitionKey(IPS_PARTITION_KEY); + return entry; + }).collect(Collectors.toList()); + + PutRecordsRequest createRecordsRequest = new PutRecordsRequest(); + createRecordsRequest.setStreamName(IPS_STREAM); + createRecordsRequest.setRecords(entries); + + kinesis.putRecords(createRecordsRequest); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java index c863cd8fe2..b748446600 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java @@ -1,23 +1,47 @@ package com.baeldung; +import javax.annotation.PostConstruct; + import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.context.annotation.Bean; import org.springframework.messaging.support.MessageBuilder; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; + @SpringBootApplication @EnableBinding(Processor.class) public class KinesisApplication { + @Value("${aws.access.key}") + private String accessKey; + @Value("${aws.secret.key}") + private String secretKey; + @Autowired + private Processor processor; + public static void main(String[] args) { SpringApplication.run(KinesisApplication.class, args); } - @Autowired - private Processor processor; + + @Bean + public AmazonKinesis buildAmazonKinesis() { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); + return AmazonKinesisClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(Regions.EU_CENTRAL_1) + .build(); + } @StreamListener(Processor.INPUT) public void consume(String val) { diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties index 1a966c64fb..32ab4da914 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties @@ -3,9 +3,13 @@ cloud.aws.credentials.secret-key=aws-secret cloud.aws.region.static=eu-central-1 cloud.aws.stack.auto=false -spring.cloud.stream.bindings.output.destination=myStream -spring.cloud.stream.bindings.output.content-type=text/plain +aws.access.key=my-aws-access-key-goes-here +aws.secret.key=my-aws-secret-key-goes-here -spring.cloud.stream.bindings.input.destination=myStream -spring.cloud.stream.bindings.input.group=myStream-group -spring.cloud.stream.bindings.input.content-type=text/plain \ No newline at end of file +spring.cloud.stream.bindings.output.destination=myStream +spring.cloud.stream.bindings.input.destination=live-ips +spring.cloud.stream.bindings.input.group=live-ips-group +spring.cloud.stream.bindings.input.content-type=text/plain + +ips.partition.key=ips-partition-key +ips.stream=ips-stream \ No newline at end of file