[JAVA-14987] Added missing code (#12983)

* [JAVA-14987] Added missing code

* [JAVA-14987] Added missing code

Co-authored-by: panagiotiskakos <panagiotis.kakos@libra-is.com>
This commit is contained in:
panos-kakos 2022-11-30 14:38:24 +00:00 committed by GitHub
parent 94c997f3b6
commit c5d0dbfc10
6 changed files with 185 additions and 7 deletions

View File

@ -0,0 +1,64 @@
package com.baeldung;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
@Component
@EnableBinding(Sink.class)
public class IpConsumer {
@Value("${ips.stream}")
private String IPS_STREAM;
@Value("${ips.shard.id}")
private String IPS_SHARD_ID;
@Autowired
private AmazonKinesis kinesis;
private GetShardIteratorResult shardIterator;
@StreamListener(Sink.INPUT)
public void consume(String ip) {
System.out.println(ip);
}
private void consumeWithKinesis() {
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);
GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords()
.isEmpty()) {
recordsResult.getRecords()
.stream()
.map(record -> new String(record.getData()
.array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
recordsResult = kinesis.getRecords(recordsRequest);
}
}
@PostConstruct
private void buildShardIterator() {
GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
readShardsRequest.setStreamName(IPS_STREAM);
readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
readShardsRequest.setShardId(IPS_SHARD_ID);
this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
public class IpProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) { }
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.getRecords()
.forEach(record -> System.out.println(new String(record.getData().array())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) { }
}

View File

@ -0,0 +1,11 @@
package com.baeldung;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
public class IpProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new IpProcessor();
}
}

View File

@ -0,0 +1,55 @@
package com.baeldung;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
@Component
@EnableBinding(Source.class)
public class IpProducer {
@Value("${ips.partition.key}")
private String IPS_PARTITION_KEY;
@Value("${ips.stream}")
private String IPS_STREAM;
@Autowired
private Source source;
@Autowired
private AmazonKinesis kinesis;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
}
@Scheduled(fixedDelay = 3000L)
private void produceWithKinesis() {
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
entry.setPartitionKey(IPS_PARTITION_KEY);
return entry;
}).collect(Collectors.toList());
PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);
kinesis.putRecords(createRecordsRequest);
}
}

View File

@ -1,23 +1,47 @@
package com.baeldung;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.support.MessageBuilder;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
@SpringBootApplication
@EnableBinding(Processor.class)
public class KinesisApplication {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
@Autowired
private Processor processor;
public static void main(String[] args) {
SpringApplication.run(KinesisApplication.class, args);
}
@Autowired
private Processor processor;
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
@StreamListener(Processor.INPUT)
public void consume(String val) {

View File

@ -3,9 +3,13 @@ cloud.aws.credentials.secret-key=aws-secret
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false
spring.cloud.stream.bindings.output.destination=myStream
spring.cloud.stream.bindings.output.content-type=text/plain
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here
spring.cloud.stream.bindings.input.destination=myStream
spring.cloud.stream.bindings.input.group=myStream-group
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.output.destination=myStream
spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain
ips.partition.key=ips-partition-key
ips.stream=ips-stream