BAEL-719: Intro to Spring Cloud Stream
This commit is contained in:
parent
310088914c
commit
57a993de64
|
@ -16,6 +16,7 @@
|
|||
<module>spring-cloud-rest</module>
|
||||
<module>spring-cloud-zookeeper</module>
|
||||
<module>spring-cloud-gateway</module>
|
||||
<module>spring-cloud-stream</module>
|
||||
</modules>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
|
@ -37,6 +38,7 @@
|
|||
<spring-cloud-starter-eureka.version>1.2.3.RELEASE</spring-cloud-starter-eureka.version>
|
||||
<spring-cloud-starter-feign.version>1.2.3.RELEASE</spring-cloud-starter-feign.version>
|
||||
<spring-cloud-starter-hystrix.version>1.2.3.RELEASE</spring-cloud-starter-hystrix.version>
|
||||
<spring-cloud-stream.version>1.3.0.RELEASE</spring-cloud-stream.version>
|
||||
<spring-boot-starter-web.version>1.4.2.RELEASE</spring-boot-starter-web.version>
|
||||
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
|
||||
<spring-boot-maven-plugin.version>1.4.2.RELEASE</spring-boot-maven-plugin.version>
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.baeldung</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-rabbit</module>
|
||||
</modules>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>spring-cloud-stream</name>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung.spring.cloud</groupId>
|
||||
<artifactId>spring-cloud</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring-boot-maven-plugin.version}</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,33 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-cloud-stream-rabbit</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>spring-cloud-stream-rabbit</name>
|
||||
<description>Simple Spring Cloud Stream</description>
|
||||
|
||||
<parent>
|
||||
<groupId>org.baeldung</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,38 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(MyProcessor.class)
|
||||
public class MultipleOutputsServiceApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(MultipleOutputsServiceApplication.class, args);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private MyProcessor processor;
|
||||
|
||||
@StreamListener(MyProcessor.INPUT)
|
||||
public void routeValues(Integer val) {
|
||||
if (val < 10) {
|
||||
processor.anOutput()
|
||||
.send(message(val));
|
||||
} else {
|
||||
processor.anotherOutput()
|
||||
.send(message(val));
|
||||
}
|
||||
}
|
||||
|
||||
private static final <T> Message<T> message(T val) {
|
||||
return MessageBuilder.withPayload(val)
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(MyProcessor.class)
|
||||
public class MultipleOutputsWithConditionsServiceApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(MultipleOutputsWithConditionsServiceApplication.class, args);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private MyProcessor processor;
|
||||
|
||||
@StreamListener(target = MyProcessor.INPUT, condition = "payload < 10")
|
||||
public void routeValuesToAnOutput(Integer val) {
|
||||
processor.anOutput()
|
||||
.send(message(val));
|
||||
}
|
||||
|
||||
@StreamListener(target = MyProcessor.INPUT, condition = "payload >= 10")
|
||||
public void routeValuesToAnotherOutput(Integer val) {
|
||||
processor.anotherOutput()
|
||||
.send(message(val));
|
||||
}
|
||||
|
||||
private static final <T> Message<T> message(T val) {
|
||||
return MessageBuilder.withPayload(val)
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.messages.TextPlainMessageConverter;
|
||||
import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Processor.class)
|
||||
public class MyLoggerServiceApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(MyLoggerServiceApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageConverter providesTextPlainMessageConverter() {
|
||||
return new TextPlainMessageConverter();
|
||||
}
|
||||
|
||||
@StreamListener(Processor.INPUT)
|
||||
@SendTo(Processor.OUTPUT)
|
||||
public LogMessage enrichLogMessage(LogMessage log) {
|
||||
return new LogMessage(String.format("[1]: %s", log.getMessage()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit.messages;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.converter.AbstractMessageConverter;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
|
||||
|
||||
public class TextPlainMessageConverter extends AbstractMessageConverter {
|
||||
|
||||
public TextPlainMessageConverter() {
|
||||
super(new MimeType("text", "plain"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supports(Class<?> clazz) {
|
||||
return (LogMessage.class == clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
|
||||
Object payload = message.getPayload();
|
||||
String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
|
||||
return new LogMessage(text);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class LogMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -5857383701708275796L;
|
||||
|
||||
private String message;
|
||||
|
||||
public LogMessage() {
|
||||
|
||||
}
|
||||
|
||||
public LogMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return message;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit.processor;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
public interface MyProcessor {
|
||||
String INPUT = "myInput";
|
||||
|
||||
@Input
|
||||
SubscribableChannel myInput();
|
||||
|
||||
@Output("myOutput")
|
||||
MessageChannel anOutput();
|
||||
|
||||
@Output
|
||||
MessageChannel anotherOutput();
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
input:
|
||||
destination: queue.log.messages
|
||||
binder: local_rabbit
|
||||
group: logMessageConsumers
|
||||
output:
|
||||
destination: queue.pretty.log.messages
|
||||
binder: local_rabbit
|
||||
binders:
|
||||
local_rabbit:
|
||||
type: rabbit
|
||||
environment:
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: localhost
|
||||
port: 5672
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: /
|
||||
server:
|
||||
port: 0
|
||||
management:
|
||||
health:
|
||||
binders:
|
||||
enabled: true
|
|
@ -0,0 +1,52 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
|
||||
@DirtiesContext
|
||||
public class MultipleOutputsServiceApplicationTests {
|
||||
|
||||
@Autowired
|
||||
private MyProcessor pipe;
|
||||
|
||||
@Autowired
|
||||
private MessageCollector messageCollector;
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMessageInAnOutput() {
|
||||
whenSendMessage(1);
|
||||
thenPayloadInChannelIs(pipe.anOutput(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMessageInAnAnotherOutput() {
|
||||
whenSendMessage(11);
|
||||
thenPayloadInChannelIs(pipe.anotherOutput(), 11);
|
||||
}
|
||||
|
||||
private void whenSendMessage(Integer val) {
|
||||
pipe.myInput()
|
||||
.send(MessageBuilder.withPayload(val)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
|
||||
Object payload = messageCollector.forChannel(channel)
|
||||
.poll()
|
||||
.getPayload();
|
||||
assertEquals(expectedValue, payload);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = MultipleOutputsWithConditionsServiceApplication.class)
|
||||
@DirtiesContext
|
||||
public class MultipleOutputsWithConditionsServiceApplicationTests {
|
||||
|
||||
@Autowired
|
||||
private MyProcessor pipe;
|
||||
|
||||
@Autowired
|
||||
private MessageCollector messageCollector;
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMessageInAnOutput() {
|
||||
whenSendMessage(1);
|
||||
thenPayloadInChannelIs(pipe.anotherOutput(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReceiveMessageInAnAnotherOutput() {
|
||||
whenSendMessage(11);
|
||||
thenPayloadInChannelIs(pipe.anotherOutput(), 11);
|
||||
}
|
||||
|
||||
private void whenSendMessage(Integer val) {
|
||||
pipe.myInput()
|
||||
.send(MessageBuilder.withPayload(val)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
|
||||
Object payload = messageCollector.forChannel(channel)
|
||||
.poll()
|
||||
.getPayload();
|
||||
assertEquals(expectedValue, payload);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package com.baeldung.spring.cloud.stream.rabbit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.baeldung.spring.cloud.stream.rabbit.MyLoggerServiceApplication;
|
||||
import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
|
||||
@DirtiesContext
|
||||
public class MyLoggerApplicationTests {
|
||||
|
||||
@Autowired
|
||||
private Processor pipe;
|
||||
|
||||
@Autowired
|
||||
private MessageCollector messageCollector;
|
||||
|
||||
@Test
|
||||
public void shouldEnrichMessage() {
|
||||
// Send message
|
||||
pipe.input()
|
||||
.send(MessageBuilder.withPayload(new LogMessage("This is my message"))
|
||||
.build());
|
||||
|
||||
// Get response from the service
|
||||
Object payload = messageCollector.forChannel(pipe.output())
|
||||
.poll()
|
||||
.getPayload();
|
||||
|
||||
// Assert
|
||||
assertEquals("[1]: This is my message", payload.toString());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue