diff --git a/spring-cloud-data-flow/etl/README.MD b/spring-cloud-data-flow/etl/README.MD
new file mode 100644
index 0000000000..0cbb460b01
--- /dev/null
+++ b/spring-cloud-data-flow/etl/README.MD
@@ -0,0 +1,9 @@
+# Overview
+This is an example of a ETL stream pipeline, mixing a starter application with custom transform and sink.
+
+# Applications
+JDBC Source - Application Starter distributed by default
+
+customer-transform - Custom application to transform the data
+
+customer-mongodb-sink - Custom application to sink the data
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/pom.xml b/spring-cloud-data-flow/etl/customer-mongodb-sink/pom.xml
new file mode 100644
index 0000000000..468d8e17d0
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-mongodb-sink/pom.xml
@@ -0,0 +1,75 @@
+
+
+ 4.0.0
+
+ com.customer
+ customer-mongodb-sink
+ jar
+
+ customer-mongodb-sink
+ Example ETL Load Project
+
+
+ parent-boot-2
+ com.baeldung
+ 0.0.1-SNAPSHOT
+ ../../../parent-boot-2
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ Finchley.SR1
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rabbit
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/Customer.java b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/Customer.java
new file mode 100644
index 0000000000..cf44aec5b7
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/Customer.java
@@ -0,0 +1,27 @@
+package com.customer.customermongodbsink;
+
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Document(collection = "customer")
+public class Customer {
+
+ private Long id;
+ private String name;
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerListener.java b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerListener.java
new file mode 100644
index 0000000000..c841daea8a
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerListener.java
@@ -0,0 +1,18 @@
+package com.customer.customermongodbsink;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+
+@EnableBinding(Sink.class)
+public class CustomerListener {
+
+ @Autowired
+ private CustomerRepository repository;
+
+ @StreamListener(Sink.INPUT)
+ public void save(Customer customer) {
+ repository.save(customer);
+ }
+}
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerMongodbSinkApplication.java b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerMongodbSinkApplication.java
new file mode 100644
index 0000000000..2ef311457e
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerMongodbSinkApplication.java
@@ -0,0 +1,12 @@
+package com.customer.customermongodbsink;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CustomerMongodbSinkApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CustomerMongodbSinkApplication.class, args);
+ }
+}
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerRepository.java b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerRepository.java
new file mode 100644
index 0000000000..f921ff51cf
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/java/com/customer/customermongodbsink/CustomerRepository.java
@@ -0,0 +1,9 @@
+package com.customer.customermongodbsink;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface CustomerRepository extends MongoRepository {
+
+}
\ No newline at end of file
diff --git a/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/resources/application.properties b/spring-cloud-data-flow/etl/customer-mongodb-sink/src/main/resources/application.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/spring-cloud-data-flow/etl/customer-transform/pom.xml b/spring-cloud-data-flow/etl/customer-transform/pom.xml
new file mode 100644
index 0000000000..bc4b648907
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-transform/pom.xml
@@ -0,0 +1,68 @@
+
+
+ 4.0.0
+
+ com.customer
+ customer-transform
+ 0.0.1-SNAPSHOT
+ jar
+
+ customer-transform
+ Example transform ETL step
+
+
+ parent-boot-2
+ com.baeldung
+ 0.0.1-SNAPSHOT
+ ../../../parent-boot-2
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ Finchley.SR1
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rabbit
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
diff --git a/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/Customer.java b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/Customer.java
new file mode 100644
index 0000000000..f0e4d79388
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/Customer.java
@@ -0,0 +1,29 @@
+package com.customer.customertransform;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Customer {
+
+ private Long id;
+
+ private String name;
+
+ @JsonProperty("customer_name")
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("name")
+ public String getName() {
+ return name;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerProcessorConfiguration.java b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerProcessorConfiguration.java
new file mode 100644
index 0000000000..c99fcf55be
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerProcessorConfiguration.java
@@ -0,0 +1,16 @@
+package com.customer.customertransform;
+
+
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.integration.annotation.Transformer;
+
+@EnableBinding(Processor.class)
+public class CustomerProcessorConfiguration {
+
+ @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
+ public Customer convertToPojo(Customer payload) {
+
+ return payload;
+ }
+}
\ No newline at end of file
diff --git a/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerTransformApplication.java b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerTransformApplication.java
new file mode 100644
index 0000000000..8781f4da54
--- /dev/null
+++ b/spring-cloud-data-flow/etl/customer-transform/src/main/java/com/customer/customertransform/CustomerTransformApplication.java
@@ -0,0 +1,12 @@
+package com.customer.customertransform;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CustomerTransformApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CustomerTransformApplication.class, args);
+ }
+}
diff --git a/spring-cloud-data-flow/etl/customer-transform/src/main/resources/application.properties b/spring-cloud-data-flow/etl/customer-transform/src/main/resources/application.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/spring-cloud-data-flow/etl/pom.xml b/spring-cloud-data-flow/etl/pom.xml
new file mode 100644
index 0000000000..2b904f6e0d
--- /dev/null
+++ b/spring-cloud-data-flow/etl/pom.xml
@@ -0,0 +1,20 @@
+
+ 4.0.0
+ org.baeldung.spring.cloud
+ etl-spring-cloud-data-flow
+ 0.0.1-SNAPSHOT
+ pom
+
+
+ org.baeldung.spring.cloud
+ spring-cloud-data-flow
+ 0.0.1-SNAPSHOT
+
+
+
+ customer-mongodb-sink
+ customer-transform
+
+
+
diff --git a/spring-cloud-data-flow/pom.xml b/spring-cloud-data-flow/pom.xml
index 5f24aa2cbd..5a007f3c7d 100644
--- a/spring-cloud-data-flow/pom.xml
+++ b/spring-cloud-data-flow/pom.xml
@@ -19,6 +19,7 @@
time-processor
log-sink
batch-job
+ etl