BAEL-2276
This commit is contained in:
parent
758648eb05
commit
261694815a
|
@ -0,0 +1,72 @@
|
||||||
|
package com.baeldung.subflows.publishsubscribechannel;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class PublishSubscibeChannelExample {
|
||||||
|
|
||||||
|
@MessagingGateway
|
||||||
|
public interface I {
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "flow.input")
|
||||||
|
void flow(Collection<Integer> is);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel multipleof3Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs1Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs2Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow flow() {
|
||||||
|
return flow -> flow.split()
|
||||||
|
.publishSubscribeChannel(s ->
|
||||||
|
s.subscribe(f -> f.<Integer> filter(p -> p % 3 == 0).channel("multipleof3Channel"))
|
||||||
|
.subscribe(f -> f.<Integer> filter(p -> p % 3 == 1).channel("remainderIs1Channel"))
|
||||||
|
.subscribe(f -> f.<Integer> filter(p -> p % 3 == 2).channel("remainderIs2Channel"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(PublishSubscibeChannelExample.class);
|
||||||
|
|
||||||
|
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
||||||
|
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
||||||
|
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
||||||
|
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
package com.baeldung.subflows.routeToRecipients;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class RouteToRecipientsExample {
|
||||||
|
|
||||||
|
@MessagingGateway
|
||||||
|
public interface I {
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "flow.input")
|
||||||
|
void flow(Collection<Integer> is);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel multipleof3Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs1Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs2Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow flow() {
|
||||||
|
return flow -> flow.split()
|
||||||
|
|
||||||
|
.routeToRecipients(r -> r.<Integer> recipient("multipleof3Channel", p -> p % 3 == 0)// filter
|
||||||
|
.<Integer> recipient("remainderIs1Channel", p -> p % 3 == 1)
|
||||||
|
.recipientFlow(sf -> sf.<Integer> filter(p -> p % 3 == 2)
|
||||||
|
.channel("remainderIs2Channel")));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouteToRecipientsExample.class);
|
||||||
|
|
||||||
|
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
||||||
|
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
||||||
|
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
||||||
|
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
package com.baeldung.subflows.separateflows;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class SeparateFlowsExample {
|
||||||
|
|
||||||
|
@MessagingGateway
|
||||||
|
public interface I {
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "multipleof3Flow.input")
|
||||||
|
void multipleof3(Collection<Integer> is);
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "remainderIs1Flow.input")
|
||||||
|
void remainderIs1(Collection<Integer> is);
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "remainderIs2Flow.input")
|
||||||
|
void remainderIs2(Collection<Integer> is);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel multipleof3Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs1Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs2Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow multipleof3Flow() {
|
||||||
|
return f -> f.split()
|
||||||
|
.<Integer> filter(p -> p % 3 == 0)
|
||||||
|
.channel("multipleof3Channel");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow remainderIs1Flow() {
|
||||||
|
return f -> f.split()
|
||||||
|
.<Integer> filter(p -> p % 3 == 1)
|
||||||
|
.channel("remainderIs1Channel");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow remainderIs2Flow() {
|
||||||
|
return f -> f.split()
|
||||||
|
.<Integer> filter(p -> p % 3 == 2)
|
||||||
|
.channel("remainderIs2Channel");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(SeparateFlowsExample.class);
|
||||||
|
|
||||||
|
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
||||||
|
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
||||||
|
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
||||||
|
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.multipleof3(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.remainderIs1(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.remainderIs2(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package com.baeldung.subflows.subflowchannel;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class FilterExample {
|
||||||
|
|
||||||
|
@MessagingGateway
|
||||||
|
public interface I {
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "flow.input")
|
||||||
|
void flow(Collection<Integer> is);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel multipleof3Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs1Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs2Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow flow() {
|
||||||
|
return flow -> flow.split()
|
||||||
|
|
||||||
|
.<Integer> filter(x -> x % 3 == 0, sf -> sf.discardFlow(subf -> subf
|
||||||
|
|
||||||
|
.<Integer> filter(x -> x % 3 == 1, ssf -> ssf.discardChannel("remainderIs2Channel"))
|
||||||
|
.channel("remainderIs1Channel")
|
||||||
|
|
||||||
|
))
|
||||||
|
|
||||||
|
.channel("multipleof3Channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(FilterExample.class);
|
||||||
|
|
||||||
|
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
||||||
|
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
||||||
|
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
||||||
|
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,70 @@
|
||||||
|
package com.baeldung.subflows.subflowmapping;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class RouterExample {
|
||||||
|
@MessagingGateway
|
||||||
|
public interface I {
|
||||||
|
|
||||||
|
@Gateway(requestChannel = "flow.input")
|
||||||
|
void flow(Collection<Integer> is);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel multipleof3Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs1Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
DirectChannel remainderIs2Channel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow flow() {
|
||||||
|
return f -> f.split()
|
||||||
|
.<Integer, Integer> route(p -> p % 3, m -> m.channelMapping(0, "multipleof3Channel")
|
||||||
|
.subFlowMapping(1, sf -> sf .channel("remainderIs1Channel"))
|
||||||
|
.subFlowMapping(2, sf -> sf.<Integer> handle((p,h)->p)))
|
||||||
|
.channel("remainderIs2Channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouterExample.class);
|
||||||
|
|
||||||
|
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
||||||
|
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
||||||
|
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
||||||
|
|
||||||
|
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
||||||
|
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
||||||
|
|
||||||
|
ctx.getBean(I.class)
|
||||||
|
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue