From f28c9a79b835677dd8bb04c48208d5c31122716f Mon Sep 17 00:00:00 2001 From: Dhawal Kapil Date: Wed, 4 Jan 2023 23:10:33 +0530 Subject: [PATCH] JAVA-14987 Added missing code and split the code in packages (#13238) - Split the code in respective packages - Added missing codes --- .../spring-cloud-stream-kinesis/pom.xml | 84 +++++++++++-------- .../java/com/baeldung/KinesisApplication.java | 53 ------------ .../com/baeldung/binder/ConsumerBinder.java | 16 ++++ .../binder/KinesisBinderApplication.java | 12 +++ .../com/baeldung/binder/ProducerBinder.java | 24 ++++++ .../baeldung/{ => kclkpl}/IpProcessor.java | 2 +- .../{ => kclkpl}/IpProcessorFactory.java | 2 +- .../java/com/baeldung/kclkpl/IpProducer.java | 30 +++++++ .../kclkpl/KinesisKCLApplication.java | 48 +++++++++++ .../kclkpl/KinesisKPLApplication.java | 38 +++++++++ .../{IpConsumer.java => sdk/ConsumerSDK.java} | 15 +--- .../baeldung/sdk/KinesisSDKApplication.java | 35 ++++++++ .../{IpProducer.java => sdk/ProducerSDK.java} | 16 +--- .../src/main/resources/application.properties | 12 +-- .../KinesisApplicationManualTest.java | 4 +- .../src/test/resources/application.properties | 12 +-- 16 files changed, 274 insertions(+), 129 deletions(-) delete mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java rename spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/{ => kclkpl}/IpProcessor.java (96%) rename spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/{ => kclkpl}/IpProcessorFactory.java (92%) create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProducer.java create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java rename spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/{IpConsumer.java => sdk/ConsumerSDK.java} (82%) create mode 100644 spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/KinesisSDKApplication.java rename spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/{IpProducer.java => sdk/ProducerSDK.java} (72%) diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml index c7d3f5d12c..397f06399f 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml @@ -1,44 +1,54 @@ - 4.0.0 - spring-cloud-stream-kinesis - spring-cloud-stream-kinesis + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + spring-cloud-stream-kinesis + spring-cloud-stream-kinesis - - com.baeldung - spring-cloud-stream - 1.0.0-SNAPSHOT - + + com.baeldung + spring-cloud-stream + 1.0.0-SNAPSHOT + - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.cloud - spring-cloud-stream-binder-kinesis - ${spring-cloud-stream-kinesis-binder.version} - - - com.amazonaws - aws-java-sdk-kinesis - ${aws-sdk.version} - - - org.springframework.cloud - spring-cloud-stream-test-support - ${spring-cloud-stream-test.version} - test - - + + + org.springframework.boot + spring-boot-starter-web + + + com.amazonaws + aws-java-sdk-kinesis + ${aws-sdk.version} + + + org.springframework.cloud + spring-cloud-stream-test-support + ${spring-cloud-stream-test.version} + test + + + com.amazonaws + amazon-kinesis-producer + 0.13.1 + + + com.amazonaws + amazon-kinesis-client + 1.11.2 + + + org.springframework.cloud + spring-cloud-stream-binder-kinesis + ${spring-cloud-stream-kinesis-binder.version} + + - - 1.11.632 - 2.0.2.RELEASE - 2.2.1.RELEASE - + + 1.11.632 + 2.0.2.RELEASE + 2.2.1.RELEASE + \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java deleted file mode 100644 index 6926560244..0000000000 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/KinesisApplication.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.baeldung; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.context.annotation.Bean; -import org.springframework.messaging.support.MessageBuilder; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; - -@SpringBootApplication -@EnableBinding(Processor.class) -public class KinesisApplication { - - @Value("${aws.access.key}") - private String accessKey; - - @Value("${aws.secret.key}") - private String secretKey; - - @Autowired - private Processor processor; - - public static void main(String[] args) { - SpringApplication.run(KinesisApplication.class, args); - } - - @Bean - public AmazonKinesis buildAmazonKinesis() { - BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - return AmazonKinesisClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) - .withRegion(Regions.EU_CENTRAL_1) - .build(); - } - - @StreamListener(Processor.INPUT) - public void consume(String val) { - System.out.println(val); - } - - public void produce(String val) { - processor.output().send(MessageBuilder.withPayload(val).build()); - } -} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java new file mode 100644 index 0000000000..38ad634086 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java @@ -0,0 +1,16 @@ +package com.baeldung.binder; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Sink.class) +public class ConsumerBinder { + + @StreamListener(Sink.INPUT) + public void consume(String ip) { + System.out.println(ip); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java new file mode 100644 index 0000000000..e4f6916ed9 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java @@ -0,0 +1,12 @@ +package com.baeldung.binder; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KinesisBinderApplication { + + public static void main(String[] args) { + SpringApplication.run(KinesisBinderApplication.class, args); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java new file mode 100644 index 0000000000..468f2886de --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java @@ -0,0 +1,24 @@ +package com.baeldung.binder; + +import java.util.stream.IntStream; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +@EnableBinding(Source.class) +public class ProducerBinder { + + @Autowired + private Source source; + + @Scheduled(fixedDelay = 3000L) + private void produce() { + IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) + .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessor.java similarity index 96% rename from spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java rename to spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessor.java index 32e6babc86..c028f530dc 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessor.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessor.java @@ -1,4 +1,4 @@ -package com.baeldung; +package com.baeldung.kclkpl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessorFactory.java similarity index 92% rename from spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java rename to spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessorFactory.java index 1ca774bb39..7515e65eff 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProcessorFactory.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProcessorFactory.java @@ -1,4 +1,4 @@ -package com.baeldung; +package com.baeldung.kclkpl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProducer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProducer.java new file mode 100644 index 0000000000..76111cfe57 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/IpProducer.java @@ -0,0 +1,30 @@ +package com.baeldung.kclkpl; + +import java.nio.ByteBuffer; +import java.util.stream.IntStream; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.amazonaws.services.kinesis.producer.KinesisProducer; + +@Component +public class IpProducer { + + @Value("${ips.stream}") + private String IPS_STREAM; + + @Value("${ips.partition.key}") + private String IPS_PARTITION_KEY; + + @Autowired + private KinesisProducer kinesisProducer; + + @Scheduled(fixedDelay = 3000L) + private void produce() { + IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())) + .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry)); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java new file mode 100644 index 0000000000..01c5af596d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java @@ -0,0 +1,48 @@ +package com.baeldung.kclkpl; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; + +@SpringBootApplication +public class KinesisKCLApplication implements ApplicationRunner { + + @Value("${aws.access.key}") + private String accessKey; + + @Value("${aws.secret.key}") + private String secretKey; + + @Value("${ips.stream}") + private String IPS_STREAM; + + public static void main(String[] args) { + SpringApplication.run(KinesisKCLApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); + KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration( + "KinesisKCLConsumer", + IPS_STREAM, + new AWSStaticCredentialsProvider(awsCredentials), + "KinesisKCLConsumer") + .withRegionName(Regions.EU_CENTRAL_1.getName()); + + new Worker.Builder() + .recordProcessorFactory(new IpProcessorFactory()) + .config(consumerConfig) + .build() + .run(); + } + +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java new file mode 100644 index 0000000000..4ff7cf8087 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java @@ -0,0 +1,38 @@ +package com.baeldung.kclkpl; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; + +@SpringBootApplication +public class KinesisKPLApplication { + + @Value("${aws.access.key}") + private String accessKey; + + @Value("${aws.secret.key}") + private String secretKey; + + public static void main(String[] args) { + SpringApplication.run(KinesisKPLApplication.class, args); + } + + @Bean + public KinesisProducer kinesisProducer() { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration() + .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) + .setVerifyCertificate(false) + .setRegion(Regions.EU_CENTRAL_1.getName()); + + return new KinesisProducer(producerConfig); + } + +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ConsumerSDK.java similarity index 82% rename from spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java rename to spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ConsumerSDK.java index 949787b687..d95d66b75a 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpConsumer.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ConsumerSDK.java @@ -1,12 +1,9 @@ -package com.baeldung; +package com.baeldung.sdk; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; import com.amazonaws.services.kinesis.AmazonKinesis; @@ -17,8 +14,7 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.ShardIteratorType; @Component -@EnableBinding(Sink.class) -public class IpConsumer { +public class ConsumerSDK { @Value("${ips.stream}") private String IPS_STREAM; @@ -31,12 +27,7 @@ public class IpConsumer { private GetShardIteratorResult shardIterator; - @StreamListener(Sink.INPUT) - public void consume(String ip) { - System.out.println(ip); - } - - private void consumeWithKinesis() { + public void consumeWithKinesis() { GetRecordsRequest recordsRequest = new GetRecordsRequest(); recordsRequest.setShardIterator(shardIterator.getShardIterator()); recordsRequest.setLimit(25); diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/KinesisSDKApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/KinesisSDKApplication.java new file mode 100644 index 0000000000..28901c0723 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/KinesisSDKApplication.java @@ -0,0 +1,35 @@ +package com.baeldung.sdk; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; + +@SpringBootApplication +public class KinesisSDKApplication { + + @Value("${aws.access.key}") + private String accessKey; + + @Value("${aws.secret.key}") + private String secretKey; + + public static void main(String[] args) { + SpringApplication.run(KinesisSDKApplication.class, args); + } + + @Bean + public AmazonKinesis buildAmazonKinesis() { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); + return AmazonKinesisClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(Regions.EU_CENTRAL_1) + .build(); + } +} \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ProducerSDK.java similarity index 72% rename from spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java rename to spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ProducerSDK.java index f59b2161f9..76ece8ddb7 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/IpProducer.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/sdk/ProducerSDK.java @@ -1,4 +1,4 @@ -package com.baeldung; +package com.baeldung.sdk; import java.nio.ByteBuffer; import java.util.List; @@ -7,9 +7,6 @@ import java.util.stream.IntStream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -18,8 +15,7 @@ import com.amazonaws.services.kinesis.model.PutRecordsRequest; import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; @Component -@EnableBinding(Source.class) -public class IpProducer { +public class ProducerSDK { @Value("${ips.partition.key}") private String IPS_PARTITION_KEY; @@ -27,17 +23,9 @@ public class IpProducer { @Value("${ips.stream}") private String IPS_STREAM; - @Autowired - private Source source; @Autowired private AmazonKinesis kinesis; - @Scheduled(fixedDelay = 3000L) - private void produce() { - IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) - .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); - } - @Scheduled(fixedDelay = 3000L) private void produceWithKinesis() { List entries = IntStream.range(1, 200).mapToObj(ipSuffix -> { diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties index 1943766c26..777abef1cc 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties @@ -1,6 +1,12 @@ +# configurations for AWS SDK consumer and producer aws.access.key=my-aws-access-key-goes-here aws.secret.key=my-aws-secret-key-goes-here +ips.partition.key=ips-partition-key +ips.stream=ips-stream +ips.shard.id=1 + +# configurations for Spring Cloud Stream Kineses Binder consumer and producer cloud.aws.credentials.access-key=my-aws-access-key cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 @@ -11,8 +17,4 @@ spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.output.destination=myStream -spring.cloud.stream.bindings.output.content-type=text/plain - -ips.partition.key=ips-partition-key -ips.stream=ips-stream -ips.shard.id=1 \ No newline at end of file +spring.cloud.stream.bindings.output.content-type=text/plain \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/java/com/baeldung/KinesisApplicationManualTest.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/java/com/baeldung/KinesisApplicationManualTest.java index a232d29be5..bbe871ea11 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/java/com/baeldung/KinesisApplicationManualTest.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/java/com/baeldung/KinesisApplicationManualTest.java @@ -5,11 +5,13 @@ import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import com.baeldung.kclkpl.KinesisKPLApplication; + /** * Manual Test - this test needs correct AWS Access Key and Secret to build the Amazon Kinesis and complete successfully */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = KinesisApplication.class) +@SpringBootTest(classes = KinesisKPLApplication.class) public class KinesisApplicationManualTest { @Test public void whenSpringContextIsBootstrapped_thenNoExceptions() { diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties index 1943766c26..777abef1cc 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties @@ -1,6 +1,12 @@ +# configurations for AWS SDK consumer and producer aws.access.key=my-aws-access-key-goes-here aws.secret.key=my-aws-secret-key-goes-here +ips.partition.key=ips-partition-key +ips.stream=ips-stream +ips.shard.id=1 + +# configurations for Spring Cloud Stream Kineses Binder consumer and producer cloud.aws.credentials.access-key=my-aws-access-key cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 @@ -11,8 +17,4 @@ spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.output.destination=myStream -spring.cloud.stream.bindings.output.content-type=text/plain - -ips.partition.key=ips-partition-key -ips.stream=ips-stream -ips.shard.id=1 \ No newline at end of file +spring.cloud.stream.bindings.output.content-type=text/plain \ No newline at end of file