From 11bcac71591140d2288abb19af873b2e4c2db791 Mon Sep 17 00:00:00 2001 From: Thoughtscript Date: Mon, 15 Jan 2018 05:01:38 +0000 Subject: [PATCH] BAEL-1175 - Finalized example and improved scripts --- .../bash/hadoop.sh | 6 +- .../boot/.gitignore | 3 + .../spring-cloud-stream-starters/boot/pom.xml | 55 +++++++++++++++++++ .../twitterhdfs/aggregate/AggregateApp.java | 18 ++++++ .../twitterhdfs/processor/ProcessorApp.java | 20 +++++++ .../baeldung/twitterhdfs/sink/SinkApp.java | 22 ++++++++ .../twitterhdfs/source/SourceApp.java | 26 +++++++++ .../src/main/resources/application.properties | 6 ++ .../spring-cloud-stream-starters/hdfs/hdfs.sh | 2 +- .../twitter/application.properties | 8 +-- .../twitter/twitter.sh | 2 +- 11 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/.gitignore create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/pom.xml create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/aggregate/AggregateApp.java create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/processor/ProcessorApp.java create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/sink/SinkApp.java create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/source/SourceApp.java create mode 100644 spring-cloud/spring-cloud-stream-starters/boot/src/main/resources/application.properties diff --git a/spring-cloud/spring-cloud-stream-starters/bash/hadoop.sh b/spring-cloud/spring-cloud-stream-starters/bash/hadoop.sh index 5eebcca426..ca8298430b 100644 --- a/spring-cloud/spring-cloud-stream-starters/bash/hadoop.sh +++ b/spring-cloud/spring-cloud-stream-starters/bash/hadoop.sh @@ -1,3 +1,5 @@ +#!/usr/bin/env bash + # For Ubuntu 14.04 # Inspired from: https://github.com/curran/setupHadoop/blob/master/setupHadoop.sh # Use from the user directory @@ -32,7 +34,7 @@ sudo mv ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys # SSH ssh localhost -# Authenticate with local user +# authenticate with osboxes.org # Start NameNode daemon and DataNode daemon start-dfs.sh @@ -40,3 +42,5 @@ start-dfs.sh # Install Maven sudo apt-get install maven + +# Access Hadoop - http://localhost:50070 \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/boot/.gitignore b/spring-cloud/spring-cloud-stream-starters/boot/.gitignore new file mode 100644 index 0000000000..e4b82e1c0f --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/.gitignore @@ -0,0 +1,3 @@ +.idea +*/target/* +*.iml diff --git a/spring-cloud/spring-cloud-stream-starters/boot/pom.xml b/spring-cloud/spring-cloud-stream-starters/boot/pom.xml new file mode 100644 index 0000000000..3e6bc134e3 --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + com.baeldung.twitterhdfs + twitterhdfs + jar + 1.0.0 + + twitterhdfs + + + UTF-8 + UTF-8 + 1.8 + + + + org.springframework.boot + spring-boot-starter-parent + 1.5.8.RELEASE + + + + + + org.springframework.cloud.stream.app + spring-cloud-starter-stream-source-twitterstream + 1.3.1.RELEASE + + + org.springframework.cloud.stream.app + spring-cloud-starter-stream-sink-hdfs + 1.3.1.RELEASE + + + + + javax.servlet + jstl + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + twitterhdfs + + \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/aggregate/AggregateApp.java b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/aggregate/AggregateApp.java new file mode 100644 index 0000000000..8b9ca6dc62 --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/aggregate/AggregateApp.java @@ -0,0 +1,18 @@ +package com.baeldung.twitterhdfs.aggregate; + +import com.baeldung.twitterhdfs.processor.ProcessorApp; +import com.baeldung.twitterhdfs.source.SourceApp; +import com.baeldung.twitterhdfs.sink.SinkApp; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder; + +@SpringBootApplication +public class AggregateApp { + public static void main(String[] args) { + new AggregateApplicationBuilder() + .from(SourceApp.class).args("--fixedDelay=5000") + .via(ProcessorApp.class) + .to(SinkApp.class).args("--debug=true") + .run(args); + } +} \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/processor/ProcessorApp.java b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/processor/ProcessorApp.java new file mode 100644 index 0000000000..e3bd1197f6 --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/processor/ProcessorApp.java @@ -0,0 +1,20 @@ +package com.baeldung.twitterhdfs.processor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.integration.annotation.Transformer; + +@SpringBootApplication +@EnableBinding(Processor.class) +public class ProcessorApp { + Logger log = LoggerFactory.getLogger(ProcessorApp.class); + + @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) + public String processMessage(String payload) { + log.info("Payload received!"); + return payload; + } +} \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/sink/SinkApp.java b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/sink/SinkApp.java new file mode 100644 index 0000000000..c0c1e287d3 --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/sink/SinkApp.java @@ -0,0 +1,22 @@ +package com.baeldung.twitterhdfs.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.app.hdfs.sink.HdfsSinkConfiguration; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.context.annotation.Import; +import org.springframework.integration.annotation.ServiceActivator; + +@SpringBootApplication +@EnableBinding(Sink.class) +@Import(HdfsSinkConfiguration.class) +public class SinkApp { + Logger log = LoggerFactory.getLogger(SinkApp.class); + + @ServiceActivator(inputChannel= Sink.INPUT) + public void loggerSink(Object payload) { + log.info("Received: " + payload); + } +} \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/source/SourceApp.java b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/source/SourceApp.java new file mode 100644 index 0000000000..f9b220561b --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/src/main/java/com/baeldung/twitterhdfs/source/SourceApp.java @@ -0,0 +1,26 @@ +package com.baeldung.twitterhdfs.source; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.app.twitterstream.source.TwitterstreamSourceConfiguration; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.Import; +import org.springframework.integration.annotation.InboundChannelAdapter; + +import java.text.SimpleDateFormat; +import java.util.Date; + +@SpringBootApplication +@EnableBinding(Source.class) +@Import(TwitterstreamSourceConfiguration.class) +public class SourceApp { + Logger log = LoggerFactory.getLogger(SourceApp.class); + + @InboundChannelAdapter(value = Source.OUTPUT) + public String timerMessageSource() { + return new SimpleDateFormat().format(new Date()); + } + +} diff --git a/spring-cloud/spring-cloud-stream-starters/boot/src/main/resources/application.properties b/spring-cloud/spring-cloud-stream-starters/boot/src/main/resources/application.properties new file mode 100644 index 0000000000..298a8ebf4d --- /dev/null +++ b/spring-cloud/spring-cloud-stream-starters/boot/src/main/resources/application.properties @@ -0,0 +1,6 @@ +hdfs.fs-uri=hdfs://127.0.0.1:50010/ + +twitter.credentials.access-token= +twitter.credentials.access-token-secret= +twitter.credentials.consumer-key= +twitter.credentials.consumer-secret= \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/hdfs/hdfs.sh b/spring-cloud/spring-cloud-stream-starters/hdfs/hdfs.sh index a9df476ef4..a6e6678feb 100644 --- a/spring-cloud/spring-cloud-stream-starters/hdfs/hdfs.sh +++ b/spring-cloud/spring-cloud-stream-starters/hdfs/hdfs.sh @@ -5,7 +5,7 @@ git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git # Build it ./mvnw clean install -PgenerateApps -# RUn it +# Run it cd apps # Optionally inject application.properties prior to build java -jar hdfs-sink.jar --fsUri=http://osboxes:50075 \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/twitter/application.properties b/spring-cloud/spring-cloud-stream-starters/twitter/application.properties index 5cfedee80e..e38612d25e 100644 --- a/spring-cloud/spring-cloud-stream-starters/twitter/application.properties +++ b/spring-cloud/spring-cloud-stream-starters/twitter/application.properties @@ -1,4 +1,4 @@ -twitter.credentials.access-token=932486336086286336-2HURQbA2cYzX5hixgAshIBy2Dhefupn -twitter.credentials.access-token-secret=0pyZ7etHvro8x85QSXxsqYFzYk63bK6DS5nNYy0R3l1io -twitter.credentials.consumer-key=10xCXkRYi5xLFYq3P0ymWGEwJ -twitter.credentials.consumer-secret=VfyCUcGfAQ2aWcd3uTg8GmVGyhUfAcNJU6ksG09TAtPMqhmWTS \ No newline at end of file +twitter.credentials.access-token= +twitter.credentials.access-token-secret= +twitter.credentials.consumer-key= +twitter.credentials.consumer-secret= \ No newline at end of file diff --git a/spring-cloud/spring-cloud-stream-starters/twitter/twitter.sh b/spring-cloud/spring-cloud-stream-starters/twitter/twitter.sh index 994d40dd4c..4c76fe637b 100644 --- a/spring-cloud/spring-cloud-stream-starters/twitter/twitter.sh +++ b/spring-cloud/spring-cloud-stream-starters/twitter/twitter.sh @@ -5,7 +5,7 @@ git clone https://github.com/spring-cloud-stream-app-starters/twitter.git # Build it ./mvnw clean install -PgenerateApps -# RUn it +# Run it cd apps # Optionally inject application.properties prior to build java -jar twitter_stream_source.jar --consumerKey= --consumerSecret= \