From 261694815af5e644f8036846330109df126123a3 Mon Sep 17 00:00:00 2001 From: Ekaterina Galkina Date: Tue, 30 Oct 2018 19:22:35 +0500 Subject: [PATCH] BAEL-2276 --- .../PublishSubscibeChannelExample.java | 72 ++++++++++++++ .../RouteToRecipientsExample.java | 71 +++++++++++++ .../separateflows/SeparateFlowsExample.java | 99 +++++++++++++++++++ .../subflowchannel/FilterExample.java | 75 ++++++++++++++ .../subflowmapping/RouterExample.java | 70 +++++++++++++ 5 files changed, 387 insertions(+) create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/publishsubscribechannel/PublishSubscibeChannelExample.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/separateflows/SeparateFlowsExample.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/subflowchannel/FilterExample.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/subflowmapping/RouterExample.java diff --git a/spring-integration/src/main/java/com/baeldung/subflows/publishsubscribechannel/PublishSubscibeChannelExample.java b/spring-integration/src/main/java/com/baeldung/subflows/publishsubscribechannel/PublishSubscibeChannelExample.java new file mode 100644 index 0000000000..ad1535da6f --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/publishsubscribechannel/PublishSubscibeChannelExample.java @@ -0,0 +1,72 @@ +package com.baeldung.subflows.publishsubscribechannel; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; + +@EnableIntegration +@IntegrationComponentScan +public class PublishSubscibeChannelExample { + + @MessagingGateway + public interface I { + + @Gateway(requestChannel = "flow.input") + void flow(Collection is); + + } + + @Bean + DirectChannel multipleof3Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs1Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs2Channel() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow flow() { + return flow -> flow.split() + .publishSubscribeChannel(s -> + s.subscribe(f -> f. filter(p -> p % 3 == 0).channel("multipleof3Channel")) + .subscribe(f -> f. filter(p -> p % 3 == 1).channel("remainderIs1Channel")) + .subscribe(f -> f. filter(p -> p % 3 == 2).channel("remainderIs2Channel")) + ); + } + + public static void main(String[] args) { + final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(PublishSubscibeChannelExample.class); + + DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class); + multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x)); + + DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class); + remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x)); + + DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class); + remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x)); + + ctx.getBean(I.class) + .flow(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.close(); + + } + +} diff --git a/spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java b/spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java new file mode 100644 index 0000000000..c22072b1ff --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java @@ -0,0 +1,71 @@ +package com.baeldung.subflows.routeToRecipients; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; + +@EnableIntegration +@IntegrationComponentScan +public class RouteToRecipientsExample { + + @MessagingGateway + public interface I { + + @Gateway(requestChannel = "flow.input") + void flow(Collection is); + + } + + @Bean + DirectChannel multipleof3Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs1Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs2Channel() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow flow() { + return flow -> flow.split() + + .routeToRecipients(r -> r. recipient("multipleof3Channel", p -> p % 3 == 0)// filter + . recipient("remainderIs1Channel", p -> p % 3 == 1) + .recipientFlow(sf -> sf. filter(p -> p % 3 == 2) + .channel("remainderIs2Channel"))); + } + + public static void main(String[] args) { + final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouteToRecipientsExample.class); + + DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class); + multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x)); + + DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class); + remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x)); + + DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class); + remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x)); + + ctx.getBean(I.class) + .flow(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.close(); + + } +} \ No newline at end of file diff --git a/spring-integration/src/main/java/com/baeldung/subflows/separateflows/SeparateFlowsExample.java b/spring-integration/src/main/java/com/baeldung/subflows/separateflows/SeparateFlowsExample.java new file mode 100644 index 0000000000..ccd49affd0 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/separateflows/SeparateFlowsExample.java @@ -0,0 +1,99 @@ +package com.baeldung.subflows.separateflows; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; + +@EnableIntegration +@IntegrationComponentScan +public class SeparateFlowsExample { + + @MessagingGateway + public interface I { + + @Gateway(requestChannel = "multipleof3Flow.input") + void multipleof3(Collection is); + + @Gateway(requestChannel = "remainderIs1Flow.input") + void remainderIs1(Collection is); + + @Gateway(requestChannel = "remainderIs2Flow.input") + void remainderIs2(Collection is); + + } + + @Bean + DirectChannel multipleof3Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs1Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs2Channel() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow multipleof3Flow() { + return f -> f.split() + . filter(p -> p % 3 == 0) + .channel("multipleof3Channel"); + + } + + @Bean + public IntegrationFlow remainderIs1Flow() { + return f -> f.split() + . filter(p -> p % 3 == 1) + .channel("remainderIs1Channel"); + + } + + @Bean + public IntegrationFlow remainderIs2Flow() { + return f -> f.split() + . filter(p -> p % 3 == 2) + .channel("remainderIs2Channel"); + + } + + public static void main(String[] args) { + + final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(SeparateFlowsExample.class); + + DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class); + multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x)); + + DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class); + remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x)); + + DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class); + remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x)); + + ctx.getBean(I.class) + .multipleof3(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.getBean(I.class) + .remainderIs1(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.getBean(I.class) + .remainderIs2(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.close(); + + } + +} diff --git a/spring-integration/src/main/java/com/baeldung/subflows/subflowchannel/FilterExample.java b/spring-integration/src/main/java/com/baeldung/subflows/subflowchannel/FilterExample.java new file mode 100644 index 0000000000..f8034ab5bd --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/subflowchannel/FilterExample.java @@ -0,0 +1,75 @@ +package com.baeldung.subflows.subflowchannel; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; + +@EnableIntegration +@IntegrationComponentScan +public class FilterExample { + + @MessagingGateway + public interface I { + + @Gateway(requestChannel = "flow.input") + void flow(Collection is); + + } + + @Bean + DirectChannel multipleof3Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs1Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs2Channel() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow flow() { + return flow -> flow.split() + + . filter(x -> x % 3 == 0, sf -> sf.discardFlow(subf -> subf + + . filter(x -> x % 3 == 1, ssf -> ssf.discardChannel("remainderIs2Channel")) + .channel("remainderIs1Channel") + + )) + + .channel("multipleof3Channel"); + } + + public static void main(String[] args) { + final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(FilterExample.class); + + DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class); + multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x)); + + DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class); + remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x)); + + DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class); + remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x)); + + ctx.getBean(I.class) + .flow(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.close(); + + } +} \ No newline at end of file diff --git a/spring-integration/src/main/java/com/baeldung/subflows/subflowmapping/RouterExample.java b/spring-integration/src/main/java/com/baeldung/subflows/subflowmapping/RouterExample.java new file mode 100644 index 0000000000..cbef3ca219 --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/subflowmapping/RouterExample.java @@ -0,0 +1,70 @@ +package com.baeldung.subflows.subflowmapping; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; + +@EnableIntegration +@IntegrationComponentScan +public class RouterExample { + @MessagingGateway + public interface I { + + @Gateway(requestChannel = "flow.input") + void flow(Collection is); + + } + + @Bean + DirectChannel multipleof3Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs1Channel() { + return new DirectChannel(); + } + + @Bean + DirectChannel remainderIs2Channel() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow flow() { + return f -> f.split() + . route(p -> p % 3, m -> m.channelMapping(0, "multipleof3Channel") + .subFlowMapping(1, sf -> sf .channel("remainderIs1Channel")) + .subFlowMapping(2, sf -> sf. handle((p,h)->p))) + .channel("remainderIs2Channel"); + } + + public static void main(String[] args) { + final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouterExample.class); + + DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class); + multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x)); + + DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class); + remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x)); + + DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class); + remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x)); + + ctx.getBean(I.class) + .flow(Arrays.asList(1, 2, 3, 4, 5, 6)); + + ctx.close(); + + } + +}