JAVA-14987 Added missing code and split the code in packages (#13238)

- Split the code in respective packages
- Added missing codes
This commit is contained in:
Dhawal Kapil 2023-01-04 23:10:33 +05:30 committed by GitHub
parent 9d4f599a1a
commit f28c9a79b8
16 changed files with 274 additions and 129 deletions

View File

@ -1,44 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-kinesis</artifactId>
<name>spring-cloud-stream-kinesis</name>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-kinesis</artifactId>
<name>spring-cloud-stream-kinesis</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>${spring-cloud-stream-kinesis-binder.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws-sdk.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>${spring-cloud-stream-test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws-sdk.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>${spring-cloud-stream-test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>${spring-cloud-stream-kinesis-binder.version}</version>
</dependency>
</dependencies>
<properties>
<aws-sdk.version>1.11.632</aws-sdk.version>
<spring-cloud-stream-kinesis-binder.version>2.0.2.RELEASE</spring-cloud-stream-kinesis-binder.version>
<spring-cloud-stream-test.version>2.2.1.RELEASE</spring-cloud-stream-test.version>
</properties>
<properties>
<aws-sdk.version>1.11.632</aws-sdk.version>
<spring-cloud-stream-kinesis-binder.version>2.0.2.RELEASE</spring-cloud-stream-kinesis-binder.version>
<spring-cloud-stream-test.version>2.2.1.RELEASE</spring-cloud-stream-test.version>
</properties>
</project>

View File

@ -1,53 +0,0 @@
package com.baeldung;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.support.MessageBuilder;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
@SpringBootApplication
@EnableBinding(Processor.class)
public class KinesisApplication {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
@Autowired
private Processor processor;
public static void main(String[] args) {
SpringApplication.run(KinesisApplication.class, args);
}
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
@StreamListener(Processor.INPUT)
public void consume(String val) {
System.out.println(val);
}
public void produce(String val) {
processor.output().send(MessageBuilder.withPayload(val).build());
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.binder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ConsumerBinder {
@StreamListener(Sink.INPUT)
public void consume(String ip) {
System.out.println(ip);
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.binder;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KinesisBinderApplication {
public static void main(String[] args) {
SpringApplication.run(KinesisBinderApplication.class, args);
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.binder;
import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
public class ProducerBinder {
@Autowired
private Source source;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung;
package com.baeldung.kclkpl;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;

View File

@ -1,4 +1,4 @@
package com.baeldung;
package com.baeldung.kclkpl;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

View File

@ -0,0 +1,30 @@
package com.baeldung.kclkpl;
import java.nio.ByteBuffer;
import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
@Component
public class IpProducer {
@Value("${ips.stream}")
private String IPS_STREAM;
@Value("${ips.partition.key}")
private String IPS_PARTITION_KEY;
@Autowired
private KinesisProducer kinesisProducer;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
.forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));
}
}

View File

@ -0,0 +1,48 @@
package com.baeldung.kclkpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
@SpringBootApplication
public class KinesisKCLApplication implements ApplicationRunner {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
@Value("${ips.stream}")
private String IPS_STREAM;
public static void main(String[] args) {
SpringApplication.run(KinesisKCLApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
"KinesisKCLConsumer",
IPS_STREAM,
new AWSStaticCredentialsProvider(awsCredentials),
"KinesisKCLConsumer")
.withRegionName(Regions.EU_CENTRAL_1.getName());
new Worker.Builder()
.recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build()
.run();
}
}

View File

@ -0,0 +1,38 @@
package com.baeldung.kclkpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@SpringBootApplication
public class KinesisKPLApplication {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
public static void main(String[] args) {
SpringApplication.run(KinesisKPLApplication.class, args);
}
@Bean
public KinesisProducer kinesisProducer() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setVerifyCertificate(false)
.setRegion(Regions.EU_CENTRAL_1.getName());
return new KinesisProducer(producerConfig);
}
}

View File

@ -1,12 +1,9 @@
package com.baeldung;
package com.baeldung.sdk;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
import com.amazonaws.services.kinesis.AmazonKinesis;
@ -17,8 +14,7 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
@Component
@EnableBinding(Sink.class)
public class IpConsumer {
public class ConsumerSDK {
@Value("${ips.stream}")
private String IPS_STREAM;
@ -31,12 +27,7 @@ public class IpConsumer {
private GetShardIteratorResult shardIterator;
@StreamListener(Sink.INPUT)
public void consume(String ip) {
System.out.println(ip);
}
private void consumeWithKinesis() {
public void consumeWithKinesis() {
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

View File

@ -0,0 +1,35 @@
package com.baeldung.sdk;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
@SpringBootApplication
public class KinesisSDKApplication {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
public static void main(String[] args) {
SpringApplication.run(KinesisSDKApplication.class, args);
}
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung;
package com.baeldung.sdk;
import java.nio.ByteBuffer;
import java.util.List;
@ -7,9 +7,6 @@ import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -18,8 +15,7 @@ import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
@Component
@EnableBinding(Source.class)
public class IpProducer {
public class ProducerSDK {
@Value("${ips.partition.key}")
private String IPS_PARTITION_KEY;
@ -27,17 +23,9 @@ public class IpProducer {
@Value("${ips.stream}")
private String IPS_STREAM;
@Autowired
private Source source;
@Autowired
private AmazonKinesis kinesis;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
}
@Scheduled(fixedDelay = 3000L)
private void produceWithKinesis() {
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {

View File

@ -1,6 +1,12 @@
# configurations for AWS SDK consumer and producer
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here
ips.partition.key=ips-partition-key
ips.stream=ips-stream
ips.shard.id=1
# configurations for Spring Cloud Stream Kineses Binder consumer and producer
cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
@ -11,8 +17,4 @@ spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.output.destination=myStream
spring.cloud.stream.bindings.output.content-type=text/plain
ips.partition.key=ips-partition-key
ips.stream=ips-stream
ips.shard.id=1
spring.cloud.stream.bindings.output.content-type=text/plain

View File

@ -5,11 +5,13 @@ import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.baeldung.kclkpl.KinesisKPLApplication;
/**
* Manual Test - this test needs correct AWS Access Key and Secret to build the Amazon Kinesis and complete successfully
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = KinesisApplication.class)
@SpringBootTest(classes = KinesisKPLApplication.class)
public class KinesisApplicationManualTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {

View File

@ -1,6 +1,12 @@
# configurations for AWS SDK consumer and producer
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here
ips.partition.key=ips-partition-key
ips.stream=ips-stream
ips.shard.id=1
# configurations for Spring Cloud Stream Kineses Binder consumer and producer
cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
@ -11,8 +17,4 @@ spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.output.destination=myStream
spring.cloud.stream.bindings.output.content-type=text/plain
ips.partition.key=ips-partition-key
ips.stream=ips-stream
ips.shard.id=1
spring.cloud.stream.bindings.output.content-type=text/plain