Spring Integration Subflows
Issue: BAEL-2276
This commit is contained in:
parent
5145b90099
commit
fcca31572b
@ -0,0 +1,57 @@
|
|||||||
|
package com.baeldung.subflows.discardflow;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.integration.annotation.Gateway;
|
||||||
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
|
@EnableIntegration
|
||||||
|
@IntegrationComponentScan
|
||||||
|
public class FilterExample {
|
||||||
|
@MessagingGateway
|
||||||
|
public interface NumbersClassifier {
|
||||||
|
@Gateway(requestChannel = "classify.input")
|
||||||
|
void classify(Collection<Integer> numbers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
QueueChannel multipleofThreeChannel() {
|
||||||
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
QueueChannel remainderIsOneChannel() {
|
||||||
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
QueueChannel remainderIsTwoChannel() {
|
||||||
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
boolean isMultipleOfThree(Integer number) {
|
||||||
|
return number % 3 == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderOne(Integer number) {
|
||||||
|
return number % 3 == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderTwo(Integer number) {
|
||||||
|
return number % 3 == 2;
|
||||||
|
}
|
||||||
|
@Bean
|
||||||
|
public IntegrationFlow classify() {
|
||||||
|
return flow -> flow.split()
|
||||||
|
.<Integer> filter(this::isMultipleOfThree, notMultiple -> notMultiple
|
||||||
|
.discardFlow(oneflow -> oneflow
|
||||||
|
.<Integer> filter(this::isRemainderOne,
|
||||||
|
twoflow -> twoflow .discardChannel("remainderIsTwoChannel"))
|
||||||
|
.channel("remainderIsOneChannel")))
|
||||||
|
.channel("multipleofThreeChannel");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,15 +1,13 @@
|
|||||||
package com.baeldung.subflows.publishsubscribechannel;
|
package com.baeldung.subflows.publishsubscribechannel;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.integration.annotation.Gateway;
|
import org.springframework.integration.annotation.Gateway;
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
import org.springframework.integration.annotation.MessagingGateway;
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
import org.springframework.integration.config.EnableIntegration;
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
import org.springframework.integration.dsl.IntegrationFlow;
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
@ -18,46 +16,45 @@ import org.springframework.integration.dsl.IntegrationFlow;
|
|||||||
public class PublishSubscibeChannelExample {
|
public class PublishSubscibeChannelExample {
|
||||||
@MessagingGateway
|
@MessagingGateway
|
||||||
public interface NumbersClassifier {
|
public interface NumbersClassifier {
|
||||||
@Gateway(requestChannel = "flow.input")
|
@Gateway(requestChannel = "classify.input")
|
||||||
void flow(Collection<Integer> numbers);
|
void classify(Collection<Integer> numbers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel multipleof3Channel() {
|
QueueChannel multipleofThreeChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs1Channel() {
|
QueueChannel remainderIsOneChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs2Channel() {
|
QueueChannel remainderIsTwoChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
boolean isMultipleOfThree(Integer number) {
|
||||||
|
return number % 3 == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isRemainderOne(Integer number) {
|
||||||
|
return number % 3 == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderTwo(Integer number) {
|
||||||
|
return number % 3 == 2;
|
||||||
|
}
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow flow() {
|
public IntegrationFlow classify() {
|
||||||
return flow -> flow.split()
|
return flow -> flow.split()
|
||||||
.publishSubscribeChannel(s -> s.subscribe(f -> f.<Integer> filter(p -> p % 3 == 0)
|
.publishSubscribeChannel(subscription -> subscription.subscribe(subflow -> subflow.<Integer> filter(this::isMultipleOfThree)
|
||||||
.channel("multipleof3Channel"))
|
.channel("multipleofThreeChannel"))
|
||||||
.subscribe(f -> f.<Integer> filter(p -> p % 3 == 1)
|
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderOne)
|
||||||
.channel("remainderIs1Channel"))
|
.channel("remainderIsOneChannel"))
|
||||||
.subscribe(f -> f.<Integer> filter(p -> p % 3 == 2)
|
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderTwo)
|
||||||
.channel("remainderIs2Channel")));
|
.channel("remainderIsTwoChannel")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(PublishSubscibeChannelExample.class);
|
|
||||||
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
|
||||||
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
|
||||||
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
|
||||||
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
|
||||||
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
|
||||||
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,12 @@
|
|||||||
package com.baeldung.subflows.routeToRecipients;
|
package com.baeldung.subflows.routetorecipients;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.integration.annotation.Gateway;
|
import org.springframework.integration.annotation.Gateway;
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
import org.springframework.integration.annotation.MessagingGateway;
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
import org.springframework.integration.config.EnableIntegration;
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
import org.springframework.integration.dsl.IntegrationFlow;
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
@ -18,44 +15,46 @@ import org.springframework.integration.dsl.IntegrationFlow;
|
|||||||
public class RouteToRecipientsExample {
|
public class RouteToRecipientsExample {
|
||||||
@MessagingGateway
|
@MessagingGateway
|
||||||
public interface NumbersClassifier {
|
public interface NumbersClassifier {
|
||||||
@Gateway(requestChannel = "flow.input")
|
@Gateway(requestChannel = "classify.input")
|
||||||
void flow(Collection<Integer> numbers);
|
void classify(Collection<Integer> numbers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel multipleof3Channel() {
|
QueueChannel multipleofThreeChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs1Channel() {
|
QueueChannel remainderIsOneChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs2Channel() {
|
QueueChannel remainderIsTwoChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
boolean isMultipleOfThree(Integer number) {
|
||||||
|
return number % 3 == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderOne(Integer number) {
|
||||||
|
return number % 3 == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderTwo(Integer number) {
|
||||||
|
return number % 3 == 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow flow() {
|
public IntegrationFlow classify() {
|
||||||
return flow -> flow.split()
|
return flow -> flow.split()
|
||||||
.routeToRecipients(r -> r.<Integer> recipient("multipleof3Channel", p -> p % 3 == 0)// filter
|
.routeToRecipients(route -> route
|
||||||
.<Integer> recipient("remainderIs1Channel", p -> p % 3 == 1)
|
.recipientFlow(subflow -> subflow
|
||||||
.recipientFlow(sf -> sf.<Integer> filter(p -> p % 3 == 2)
|
.<Integer> filter(this::isMultipleOfThree)
|
||||||
.channel("remainderIs2Channel")));
|
.channel("multipleofThreeChannel"))
|
||||||
|
.<Integer> recipient("remainderIsOneChannel",this::isRemainderOne)
|
||||||
|
.<Integer> recipient("remainderIsTwoChannel",this::isRemainderTwo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouteToRecipientsExample.class);
|
|
||||||
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
|
||||||
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
|
||||||
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
|
||||||
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
|
||||||
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
|
||||||
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -1,15 +1,13 @@
|
|||||||
package com.baeldung.subflows.separateflows;
|
package com.baeldung.subflows.separateflows;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.integration.annotation.Gateway;
|
import org.springframework.integration.annotation.Gateway;
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
import org.springframework.integration.annotation.MessagingGateway;
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
import org.springframework.integration.config.EnableIntegration;
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
import org.springframework.integration.dsl.IntegrationFlow;
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
@ -18,66 +16,62 @@ import org.springframework.integration.dsl.IntegrationFlow;
|
|||||||
public class SeparateFlowsExample {
|
public class SeparateFlowsExample {
|
||||||
@MessagingGateway
|
@MessagingGateway
|
||||||
public interface NumbersClassifier {
|
public interface NumbersClassifier {
|
||||||
@Gateway(requestChannel = "multipleof3Flow.input")
|
@Gateway(requestChannel = "multipleOfThreeFlow.input")
|
||||||
void multipleof3(Collection<Integer> is);
|
void multipleofThree(Collection<Integer> numbers);
|
||||||
|
|
||||||
@Gateway(requestChannel = "remainderIs1Flow.input")
|
@Gateway(requestChannel = "remainderIsOneFlow.input")
|
||||||
void remainderIs1(Collection<Integer> is);
|
void remainderIsOne(Collection<Integer> numbers);
|
||||||
|
|
||||||
@Gateway(requestChannel = "remainderIs2Flow.input")
|
@Gateway(requestChannel = "remainderIsTwoFlow.input")
|
||||||
void remainderIs2(Collection<Integer> numbers);
|
void remainderIsTwo(Collection<Integer> numbers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel multipleof3Channel() {
|
QueueChannel multipleOfThreeChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs1Channel() {
|
QueueChannel remainderIsOneChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs2Channel() {
|
QueueChannel remainderIsTwoChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isMultipleOfThree(Integer number) {
|
||||||
|
return number % 3 == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderOne(Integer number) {
|
||||||
|
return number % 3 == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderTwo(Integer number) {
|
||||||
|
return number % 3 == 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow multipleof3Flow() {
|
public IntegrationFlow multipleOfThreeFlow() {
|
||||||
return f -> f.split()
|
return flow -> flow.split()
|
||||||
.<Integer> filter(p -> p % 3 == 0)
|
.<Integer> filter(this::isMultipleOfThree)
|
||||||
.channel("multipleof3Channel");
|
.channel("multipleOfThreeChannel");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow remainderIs1Flow() {
|
public IntegrationFlow remainderIsOneFlow() {
|
||||||
return f -> f.split()
|
return flow -> flow.split()
|
||||||
.<Integer> filter(p -> p % 3 == 1)
|
.<Integer> filter(this::isRemainderOne)
|
||||||
.channel("remainderIs1Channel");
|
.channel("remainderIsOneChannel");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow remainderIs2Flow() {
|
public IntegrationFlow remainderIsTwoFlow() {
|
||||||
return f -> f.split()
|
return flow -> flow.split()
|
||||||
.<Integer> filter(p -> p % 3 == 2)
|
.<Integer> filter(this::isRemainderTwo)
|
||||||
.channel("remainderIs2Channel");
|
.channel("remainderIsTwoChannel");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(SeparateFlowsExample.class);
|
|
||||||
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
|
||||||
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
|
||||||
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
|
||||||
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
|
||||||
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
|
||||||
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.multipleof3(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.remainderIs1(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.remainderIs2(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -1,60 +0,0 @@
|
|||||||
package com.baeldung.subflows.subflowchannel;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.integration.annotation.Gateway;
|
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
|
||||||
import org.springframework.integration.annotation.MessagingGateway;
|
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
import org.springframework.integration.config.EnableIntegration;
|
|
||||||
import org.springframework.integration.dsl.IntegrationFlow;
|
|
||||||
|
|
||||||
@EnableIntegration
|
|
||||||
@IntegrationComponentScan
|
|
||||||
public class FilterExample {
|
|
||||||
@MessagingGateway
|
|
||||||
public interface NumbersClassifier {
|
|
||||||
@Gateway(requestChannel = "flow.input")
|
|
||||||
void flow(Collection<Integer> numbers);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
DirectChannel multipleof3Channel() {
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
DirectChannel remainderIs1Channel() {
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
DirectChannel remainderIs2Channel() {
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public IntegrationFlow flow() {
|
|
||||||
return flow -> flow.split()
|
|
||||||
.<Integer> filter(x -> x % 3 == 0, sf -> sf.discardFlow(subf -> subf.<Integer> filter(x -> x % 3 == 1, ssf -> ssf.discardChannel("remainderIs2Channel"))
|
|
||||||
.channel("remainderIs1Channel")))
|
|
||||||
.channel("multipleof3Channel");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(FilterExample.class);
|
|
||||||
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
|
||||||
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
|
||||||
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
|
||||||
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
|
||||||
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
|
||||||
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,15 +1,11 @@
|
|||||||
package com.baeldung.subflows.subflowmapping;
|
package com.baeldung.subflows.subflowmapping;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
|
||||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.integration.annotation.Gateway;
|
import org.springframework.integration.annotation.Gateway;
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||||
import org.springframework.integration.annotation.MessagingGateway;
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
import org.springframework.integration.config.EnableIntegration;
|
import org.springframework.integration.config.EnableIntegration;
|
||||||
import org.springframework.integration.dsl.IntegrationFlow;
|
import org.springframework.integration.dsl.IntegrationFlow;
|
||||||
|
|
||||||
@ -18,44 +14,49 @@ import org.springframework.integration.dsl.IntegrationFlow;
|
|||||||
public class RouterExample {
|
public class RouterExample {
|
||||||
@MessagingGateway
|
@MessagingGateway
|
||||||
public interface NumbersClassifier {
|
public interface NumbersClassifier {
|
||||||
@Gateway(requestChannel = "flow.input")
|
@Gateway(requestChannel = "classify.input")
|
||||||
void flow(Collection<Integer> numbers);
|
void classify(Collection<Integer> numbers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel multipleof3Channel() {
|
QueueChannel multipleofThreeChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs1Channel() {
|
QueueChannel remainderIsOneChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
DirectChannel remainderIs2Channel() {
|
QueueChannel remainderIsTwoChannel() {
|
||||||
return new DirectChannel();
|
return new QueueChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isMultipleOfThree(Integer number) {
|
||||||
|
return number % 3 == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderOne(Integer number) {
|
||||||
|
return number % 3 == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRemainderTwo(Integer number) {
|
||||||
|
return number % 3 == 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public IntegrationFlow flow() {
|
public IntegrationFlow classify() {
|
||||||
return f -> f.split()
|
return flow -> flow.split()
|
||||||
.<Integer, Integer> route(p -> p % 3, m -> m.channelMapping(0, "multipleof3Channel")
|
.<Integer, Integer> route(number -> number % 3,
|
||||||
.subFlowMapping(1, sf -> sf.channel("remainderIs1Channel"))
|
mapping -> mapping
|
||||||
.subFlowMapping(2, sf -> sf.<Integer> handle((p, h) -> p)))
|
.channelMapping(0, "multipleofThreeChannel")
|
||||||
.channel("remainderIs2Channel");
|
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
|
||||||
|
.subFlowMapping(2, subflow -> subflow
|
||||||
|
.<Integer> handle((payload, headers) -> {
|
||||||
|
// do extra work on the payload
|
||||||
|
return payload;
|
||||||
|
}))).channel("remainderIsTwoChannel");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
final ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(RouterExample.class);
|
|
||||||
DirectChannel multipleof3Channel = ctx.getBean("multipleof3Channel", DirectChannel.class);
|
|
||||||
multipleof3Channel.subscribe(x -> System.out.println("multipleof3Channel: " + x));
|
|
||||||
DirectChannel remainderIs1Channel = ctx.getBean("remainderIs1Channel", DirectChannel.class);
|
|
||||||
remainderIs1Channel.subscribe(x -> System.out.println("remainderIs1Channel: " + x));
|
|
||||||
DirectChannel remainderIs2Channel = ctx.getBean("remainderIs2Channel", DirectChannel.class);
|
|
||||||
remainderIs2Channel.subscribe(x -> System.out.println("remainderIs2Channel: " + x));
|
|
||||||
ctx.getBean(NumbersClassifier.class)
|
|
||||||
.flow(Arrays.asList(1, 2, 3, 4, 5, 6));
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
package com.baeldung.subflows.discardflow;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
import com.baeldung.subflows.discardflow.FilterExample.NumbersClassifier;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { FilterExample.class })
|
||||||
|
public class FilterUnitTest {
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel multipleofThreeChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsOneChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsTwoChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NumbersClassifier numbersClassifier;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
|
||||||
|
|
||||||
|
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 3);
|
||||||
|
|
||||||
|
outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 6);
|
||||||
|
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 1);
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 4);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 2);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 5);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
package com.baeldung.subflows.publishsubscribechannel;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
import com.baeldung.subflows.publishsubscribechannel.PublishSubscibeChannelExample.NumbersClassifier;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { PublishSubscibeChannelExample.class })
|
||||||
|
public class PublishSubscribeChannelUnitTest {
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel multipleofThreeChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsOneChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsTwoChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NumbersClassifier numbersClassifier;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
|
||||||
|
|
||||||
|
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 3);
|
||||||
|
|
||||||
|
outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 6);
|
||||||
|
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 1);
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 4);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 2);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 5);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,63 @@
|
|||||||
|
package com.baeldung.subflows.routetorecipients;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
import com.baeldung.subflows.routetorecipients.RouteToRecipientsExample;
|
||||||
|
import com.baeldung.subflows.routetorecipients.RouteToRecipientsExample.NumbersClassifier;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { RouteToRecipientsExample.class })
|
||||||
|
public class RouteToRecipientsUnitTest {
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel multipleofThreeChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsOneChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsTwoChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NumbersClassifier numbersClassifier;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
|
||||||
|
|
||||||
|
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 3);
|
||||||
|
|
||||||
|
outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 6);
|
||||||
|
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 1);
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 4);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 2);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 5);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,75 @@
|
|||||||
|
package com.baeldung.subflows.separateflows;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import com.baeldung.subflows.separateflows.SeparateFlowsExample.NumbersClassifier;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { SeparateFlowsExample.class })
|
||||||
|
public class SeparateFlowsUnitTest {
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel multipleOfThreeChannel;
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsOneChannel;
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsTwoChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NumbersClassifier numbersClassifier;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
|
||||||
|
|
||||||
|
numbersClassifier.multipleofThree(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = multipleOfThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 3);
|
||||||
|
|
||||||
|
outMessage = multipleOfThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 6);
|
||||||
|
outMessage = multipleOfThreeChannel.receive(0);
|
||||||
|
assertNull(outMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToRemainderIs1Flow_thenOutputRemainderIs1() {
|
||||||
|
|
||||||
|
numbersClassifier.remainderIsOne(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 1);
|
||||||
|
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 4);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToRemainderIs2Flow_thenOutputRemainderIs2() {
|
||||||
|
|
||||||
|
numbersClassifier.remainderIsTwo(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 2);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 5);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
package com.baeldung.subflows.subflowmapping;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.channel.QueueChannel;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
import com.baeldung.subflows.subflowmapping.RouterExample.NumbersClassifier;
|
||||||
|
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { RouterExample.class })
|
||||||
|
public class RouterUnitTest {
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel multipleofThreeChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsOneChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private QueueChannel remainderIsTwoChannel;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NumbersClassifier numbersClassifier;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
|
||||||
|
|
||||||
|
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
|
||||||
|
|
||||||
|
Message<?> outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 3);
|
||||||
|
|
||||||
|
outMessage = multipleofThreeChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 6);
|
||||||
|
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 1);
|
||||||
|
outMessage = remainderIsOneChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 4);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 2);
|
||||||
|
|
||||||
|
outMessage = remainderIsTwoChannel.receive(0);
|
||||||
|
|
||||||
|
assertEquals(outMessage.getPayload(), 5);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user