BAEL-20552: Migrate spring-webflux-amqp module to the com.baeldung package

This commit is contained in:
Krzysztof Woyke 2019-12-27 14:43:10 +01:00
parent 411140cceb
commit 066ac49488
7 changed files with 71 additions and 74 deletions

View File

@ -2,7 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.baeldung.spring</groupId> <groupId>com.baeldung.spring</groupId>
<artifactId>spring-webflux-amqp</artifactId> <artifactId>spring-webflux-amqp</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
<name>spring-webflux-amqp</name> <name>spring-webflux-amqp</name>

View File

@ -1,11 +1,9 @@
package org.baeldung.spring.amqp; package com.baeldung.spring.amqp;
import java.time.Duration; import java.time.Duration;
import java.util.Date;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
@ -29,7 +27,6 @@ import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@RestController @RestController
public class AmqpReactiveController { public class AmqpReactiveController {
@ -105,7 +102,7 @@ public class AmqpReactiveController {
public Mono<ResponseEntity<?>> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) { public Mono<ResponseEntity<?>> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details // Lookup exchange details
final DestinationInfo d = destinationsConfig.getQueues() final DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
.get(name); .get(name);
if (d == null) { if (d == null) {
@ -135,7 +132,7 @@ public class AmqpReactiveController {
@GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) { public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getQueues() DestinationsConfig.DestinationInfo d = destinationsConfig.getQueues()
.get(name); .get(name);
if (d == null) { if (d == null) {
@ -201,7 +198,7 @@ public class AmqpReactiveController {
public Mono<ResponseEntity<?>> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) { public Mono<ResponseEntity<?>> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details // Lookup exchange details
final DestinationInfo d = destinationsConfig.getTopics() final DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
.get(name); .get(name);
if (d == null) { if (d == null) {
// Destination not found. // Destination not found.
@ -223,7 +220,7 @@ public class AmqpReactiveController {
@GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) { public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getTopics() DestinationsConfig.DestinationInfo d = destinationsConfig.getTopics()
.get(name); .get(name);
if (d == null) { if (d == null) {
@ -281,7 +278,7 @@ public class AmqpReactiveController {
} }
private Queue createTopicQueue(DestinationInfo destination) { private Queue createTopicQueue(DestinationsConfig.DestinationInfo destination) {
Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
.durable(true) .durable(true)

View File

@ -1,59 +1,59 @@
package org.baeldung.spring.amqp; package com.baeldung.spring.amqp;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("destinations") @ConfigurationProperties("destinations")
public class DestinationsConfig { public class DestinationsConfig {
private Map<String,DestinationInfo> queues = new HashMap<>(); private Map<String,DestinationInfo> queues = new HashMap<>();
private Map<String,DestinationInfo> topics = new HashMap<>(); private Map<String,DestinationInfo> topics = new HashMap<>();
public Map<String, DestinationInfo> getQueues() { public Map<String, DestinationInfo> getQueues() {
return queues; return queues;
} }
public void setQueues(Map<String, DestinationInfo> queues) { public void setQueues(Map<String, DestinationInfo> queues) {
this.queues = queues; this.queues = queues;
} }
public Map<String, DestinationInfo> getTopics() { public Map<String, DestinationInfo> getTopics() {
return topics; return topics;
} }
public void setTopics(Map<String, DestinationInfo> topics) { public void setTopics(Map<String, DestinationInfo> topics) {
this.topics = topics; this.topics = topics;
} }
// DestinationInfo stores the Exchange name and routing key used // DestinationInfo stores the Exchange name and routing key used
// by our producers when posting messages // by our producers when posting messages
static class DestinationInfo { static class DestinationInfo {
private String exchange; private String exchange;
private String routingKey; private String routingKey;
public String getExchange() { public String getExchange() {
return exchange; return exchange;
} }
public void setExchange(String exchange) { public void setExchange(String exchange) {
this.exchange = exchange; this.exchange = exchange;
} }
public String getRoutingKey() { public String getRoutingKey() {
return routingKey; return routingKey;
} }
public void setRoutingKey(String routingKey) { public void setRoutingKey(String routingKey) {
this.routingKey = routingKey; this.routingKey = routingKey;
} }
} }
} }

View File

@ -1,4 +1,4 @@
package org.baeldung.spring.amqp; package com.baeldung.spring.amqp;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;

View File

@ -1,4 +1,4 @@
package org.baeldung.spring.amqp; package com.baeldung.spring.amqp;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -1,6 +1,6 @@
package org.baeldung; package com.baeldung;
import org.baeldung.spring.amqp.SpringWebfluxAmqpApplication; import com.baeldung.spring.amqp.SpringWebfluxAmqpApplication;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;

View File

@ -1,4 +1,4 @@
package org.baeldung.spring.amqp; package com.baeldung.spring.amqp;
import org.junit.Test; import org.junit.Test;
import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient;