SEDA architecture with si and camel (#12588)

* adding getCurrentTime method to TimeAgoCalculatorUnitTest in order to always return the same time and avoid problems related to reading date from local system.
adding two methods to TimeAgoCalculator to always return the same date as the current date in order to avoid problems related to reading current time from local host. One of these two methods accepts time zone

* adding getCurrentTime method to TimeAgoCalculatorUnitTest in order to always return the same time and avoid problems related to reading date from local system.
adding two methods to TimeAgoCalculator to always return the same date as the current date in order to avoid problems related to reading current time from local host. One of these two methods accepts time zone
correcting some formattings
adding comments in code to clarify adding of getCurrentTime methods

* reverting changes to ZuulConfig

* adding seda package to design-patterns-architectural to demonstrate the implementation of SEDA architectural style in spring integration and apache camel in solving word count problem.

* changing version of apache camel to be compatible with java 8

* changing toList() to Collectors.toList() in order to be compatile with java 8

* fixing package names in test classes

* fixing string for scanning integration gateway

* resolving comments on pull request

* resolving comments on pull request

* renaming java file of a class

* changing variable name

* add a line before assertion

* some formatting of fluent API

* using transform instead of service activator

* change name of class

* BAEL-5636 Fixed compilation error and applied Baeldung formatter

Co-authored-by: bipster <openbip@gmail.com>
This commit is contained in:
hesamghiasi 2022-09-20 07:16:47 +04:30 committed by GitHub
parent 062d7026cd
commit b695aa51b5
8 changed files with 380 additions and 0 deletions

View File

@ -41,6 +41,26 @@
<version>${mysql-connector.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>${spring-boot-starter-integration.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<version>${spring-integration-test.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
<version>${camel-test-junit5.version}</version>
</dependency>
</dependencies>
<properties>
@ -48,6 +68,10 @@
<mysql-connector.version>6.0.6</mysql-connector.version>
<spring-boot.version>2.5.3</spring-boot.version>
<rest-assured.version>3.3.0</rest-assured.version>
<spring-boot-starter-integration.version>2.7.2</spring-boot-starter-integration.version>
<spring-integration-test.version>5.5.14</spring-integration-test.version>
<camel-core.version>3.14.0</camel-core.version>
<camel-test-junit5.version>3.14.0</camel-test-junit5.version>
</properties>
</project>

View File

@ -0,0 +1,48 @@
package com.baeldung.seda.apachecamel;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
public class WordCountRoute extends RouteBuilder {
public static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
public static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
public static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
public static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
public static final String returnResponse = "mock:result";
@Override
public void configure() throws Exception {
from(receiveTextUri).to(splitWordsUri);
from(splitWordsUri).transform(ExpressionBuilder.bodyExpression(s -> s.toString()
.split(" ")))
.to(toLowerCaseUri);
from(toLowerCaseUri).split(body(), new StringListAggregationStrategy())
.transform(ExpressionBuilder.bodyExpression(body -> body.toString()
.toLowerCase()))
.end()
.to(countWordsUri);
from(countWordsUri).transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
.to(returnResponse);
}
}
class StringListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
return exchange.getIn()
.getBody(String.class);
}
}

View File

@ -0,0 +1,56 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
@Configuration
public class ChannelConfiguration {
private final TaskExecutor receiveTextChannelThreadPool;
private final TaskExecutor splitWordsChannelThreadPool;
private final TaskExecutor toLowerCaseChannelThreadPool;
private final TaskExecutor countWordsChannelThreadPool;
private final TaskExecutor returnResponseChannelThreadPool;
public ChannelConfiguration(TaskExecutor receiveTextChannelThreadPool, TaskExecutor splitWordsChannelThreadPool, TaskExecutor toLowerCaseChannelThreadPool, TaskExecutor countWordsChannelThreadPool, TaskExecutor returnResponseChannelThreadPool) {
this.receiveTextChannelThreadPool = receiveTextChannelThreadPool;
this.splitWordsChannelThreadPool = splitWordsChannelThreadPool;
this.toLowerCaseChannelThreadPool = toLowerCaseChannelThreadPool;
this.countWordsChannelThreadPool = countWordsChannelThreadPool;
this.returnResponseChannelThreadPool = returnResponseChannelThreadPool;
}
@Bean(name = "receiveTextChannel")
public MessageChannel getReceiveTextChannel() {
return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
.get();
}
@Bean(name = "splitWordsChannel")
public MessageChannel getSplitWordsChannel() {
return MessageChannels.executor("split-words", splitWordsChannelThreadPool)
.get();
}
@Bean(name = "toLowerCaseChannel")
public MessageChannel getToLowerCaseChannel() {
return MessageChannels.executor("to-lower-case", toLowerCaseChannelThreadPool)
.get();
}
@Bean(name = "countWordsChannel")
public MessageChannel getCountWordsChannel() {
return MessageChannels.executor("count-words", countWordsChannelThreadPool)
.get();
}
@Bean(name = "returnResponseChannel")
public MessageChannel getReturnResponseChannel() {
return MessageChannels.executor("return-response", returnResponseChannelThreadPool)
.get();
}
}

View File

@ -0,0 +1,81 @@
package com.baeldung.seda.springintegration;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
private final MessageChannel receiveTextChannel;
private final MessageChannel splitWordsChannel;
private final MessageChannel toLowerCaseChannel;
private final MessageChannel countWordsChannel;
private final MessageChannel returnResponseChannel;
private final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");
private final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
private final Function<String, String> toLowerCase = String::toLowerCase;
private final MessageGroupProcessor buildMessageWithListPayload = messageGroup -> MessageBuilder.withPayload(messageGroup.streamMessages()
.map(Message::getPayload)
.collect(Collectors.toList()))
.build();
private final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();
public IntegrationConfiguration(MessageChannel receiveTextChannel, MessageChannel splitWordsChannel, MessageChannel toLowerCaseChannel, MessageChannel countWordsChannel, MessageChannel returnResponseChannel) {
this.receiveTextChannel = receiveTextChannel;
this.splitWordsChannel = splitWordsChannel;
this.toLowerCaseChannel = toLowerCaseChannel;
this.countWordsChannel = countWordsChannel;
this.returnResponseChannel = returnResponseChannel;
}
@Bean
public IntegrationFlow receiveText() {
return IntegrationFlows.from(receiveTextChannel)
.channel(splitWordsChannel)
.get();
}
@Bean
public IntegrationFlow splitWords() {
return IntegrationFlows.from(splitWordsChannel)
.transform(splitWordsFunction)
.channel(toLowerCaseChannel)
.get();
}
@Bean
public IntegrationFlow toLowerCase() {
return IntegrationFlows.from(toLowerCaseChannel)
.split()
.transform(toLowerCase)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
.outputProcessor(buildMessageWithListPayload))
.channel(countWordsChannel)
.get();
}
@Bean
public IntegrationFlow countWords() {
return IntegrationFlows.from(countWordsChannel)
.transform(convertArrayListToCountMap)
.channel(returnResponseChannel)
.get();
}
}

View File

@ -0,0 +1,61 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class TaskExecutorConfiguration {
@Bean("receiveTextChannelThreadPool")
public TaskExecutor receiveTextChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("receive-text-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("splitWordsChannelThreadPool")
public TaskExecutor splitWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("split-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("toLowerCaseChannelThreadPool")
public TaskExecutor toLowerCaseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("tto-lower-case-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("countWordsChannelThreadPool")
public TaskExecutor countWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("count-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("returnResponseChannelThreadPool")
public TaskExecutor returnResponseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("return-response-channel-thread-pool");
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.seda.springintegration;
import java.util.Map;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
public Map<String, Long> countWords(String test);
}

View File

@ -0,0 +1,48 @@
package com.baeldung.seda.apachecamel;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
import com.baeldung.seda.apachecamel.WordCountRoute;
public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
@Test
public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap() throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");
assertMockEndpointsSatisfied();
}
@Test
public void givenTextWithDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap() throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("the", 3L);
expected.put("dog", 1L);
expected.put("chased", 1L);
expected.put("rabbit", 1L);
expected.put("into", 1L);
expected.put("jungle", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "the dog chased the rabbit into the jungle");
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
RoutesBuilder wordCountRoute = new WordCountRoute();
return wordCountRoute;
}
}

View File

@ -0,0 +1,50 @@
package com.baeldung.seda.springintegration;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.support.AnnotationConfigContextLoader;
@ExtendWith(SpringExtension.class)
@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = { TaskExecutorConfiguration.class, ChannelConfiguration.class, IntegrationConfiguration.class })
@EnableIntegration
@IntegrationComponentScan(basePackages = { "com.baeldung.seda.springintegration" })
public class SpringIntegrationSedaIntegrationTest {
@Autowired
TestGateway testGateway;
@Test
void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("My name is Hesam");
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
org.junit.Assert.assertEquals(expected, actual);
}
@Test
void givenTextWithDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("the dog chased the rabbit into the jungle");
Map<String, Long> expected = new HashMap<>();
expected.put("the", 3L);
expected.put("dog", 1L);
expected.put("chased", 1L);
expected.put("rabbit", 1L);
expected.put("into", 1L);
expected.put("jungle", 1L);
org.junit.Assert.assertEquals(expected, actual);
}
}