"BAEL-4894: Wire tap (EIP Patterns)" (#10781)
This commit is contained in:
parent
6877e54e1a
commit
74805c8ada
|
@ -0,0 +1 @@
|
||||||
|
/product-service/
|
|
@ -0,0 +1,62 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>patterns</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>enterprise-patterns</artifactId>
|
||||||
|
<packaging>pom</packaging>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<camel.version>3.7.4</camel.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel.springboot</groupId>
|
||||||
|
<artifactId>camel-spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel.springboot</groupId>
|
||||||
|
<artifactId>camel-activemq-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Test -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
<version>2.2.2.RELEASE</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel</groupId>
|
||||||
|
<artifactId>camel-test-spring-junit5</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel.springboot</groupId>
|
||||||
|
<artifactId>camel-spring-boot-dependencies</artifactId>
|
||||||
|
<version>${camel.version}</version>
|
||||||
|
<type>pom</type>
|
||||||
|
<scope>import</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
|
@ -0,0 +1,34 @@
|
||||||
|
# Wire Tap Pattern
|
||||||
|
|
||||||
|
The application shows you how to use a Wire Tap to monitor, debug or troubleshoot messages flowing through the system, without permanently consuming them off, or making any changes to the expected message in the output channel.
|
||||||
|
|
||||||
|
This example shows how to implement this with a simple Apache Camel application using Spring Boot and Apache ActiveMq.
|
||||||
|
For convenience, we are using in-memory activeMq.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### Configuring and using the Connection Factory
|
||||||
|
|
||||||
|
1. Create CamelContext.
|
||||||
|
2. Connect to embedded (or remote) ActiveMQ JMS broker.
|
||||||
|
3. Add JMS queue to CamelContext.
|
||||||
|
4. Load file orders (xml/csv) from src/data into the JMS queue.
|
||||||
|
5. Based on the extension of the incoming file message, route to the respective queues.
|
||||||
|
6. Test that the destination route is working.
|
||||||
|
7. Audit the received file (order) from the wire tap queue.
|
||||||
|
|
||||||
|
### How to run the example:
|
||||||
|
|
||||||
|
mvn spring-boot:run
|
||||||
|
|
||||||
|
|
||||||
|
The Wire Tap processor, by default, makes a shallow copy of the Camel Exchange instance. The copy of the exchange is sent to the endpoint specified in the wireTap statement. The body of the wire tapped message contains the same object as that in the original message which means any change to the internal state of that object during the wire tap route may also end up changing the main message’s body.
|
||||||
|
|
||||||
|
To solve this, we need to create a deep copy of the object before passing it to the wire tap destination. Wire Tap EIP provides us with a mechanism to perform a “deep” copy of the message, by implementing the org.apache.camel.Processor class. This needs to be be called using onPrepare statement right after wireTap.
|
||||||
|
For more details, check out the AmqApplicationUnitTest.class.
|
||||||
|
|
||||||
|
### Relevant Articles:
|
||||||
|
|
||||||
|
- [Wire tap (Enterprise Integration Pattern)](https://drafts.baeldung.com/?p=103346&preview=true)
|
||||||
|
- [Intro to Apache camel](https://www.baeldung.com/apache-camel-intro)
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>wire-tap</artifactId>
|
||||||
|
<version>1.0</version>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<artifactId>enterprise-patterns</artifactId>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,2 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<order name="motor" amount="1" customer="honda"/>
|
|
@ -0,0 +1,2 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<order name="motor" amount="1" customer="honda"/>
|
|
@ -0,0 +1,70 @@
|
||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.camel.CamelContext;
|
||||||
|
import org.apache.camel.ProducerTemplate;
|
||||||
|
import org.apache.camel.RoutesBuilder;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.apache.camel.component.jms.JmsComponent;
|
||||||
|
import org.apache.camel.impl.DefaultCamelContext;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class AmqApplication {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
SpringApplication.run(AmqApplication.class, args);
|
||||||
|
|
||||||
|
try (CamelContext context = new DefaultCamelContext()) {
|
||||||
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
|
||||||
|
"vm://localhost?broker.persistent=false");
|
||||||
|
connectionFactory.setTrustAllPackages(true);
|
||||||
|
context.addComponent("direct", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
|
||||||
|
addRoute(context);
|
||||||
|
|
||||||
|
try (ProducerTemplate template = context.createProducerTemplate()) {
|
||||||
|
context.start();
|
||||||
|
|
||||||
|
MyPayload payload = new MyPayload("One");
|
||||||
|
template.sendBody("direct:source", payload);
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} finally {
|
||||||
|
context.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addRoute(CamelContext context) throws Exception {
|
||||||
|
context.addRoutes(newExchangeRoute());
|
||||||
|
}
|
||||||
|
|
||||||
|
static RoutesBuilder traditionalWireTapRoute() {
|
||||||
|
return new RouteBuilder() {
|
||||||
|
public void configure() {
|
||||||
|
|
||||||
|
from("direct:source").log("Main route: Send '${body}' to tap router").wireTap("direct:tap").delay(1000)
|
||||||
|
.log("Main route: Add 'two' to '${body}'").bean(MyBean.class, "addTwo").to("direct:destination")
|
||||||
|
.log("Main route: Output '${body}'");
|
||||||
|
|
||||||
|
from("direct:tap").log("Tap Wire route: received '${body}'")
|
||||||
|
.log("Tap Wire route: Add 'three' to '${body}'").bean(MyBean.class, "addThree")
|
||||||
|
.log("Tap Wire route: Output '${body}'");
|
||||||
|
|
||||||
|
from("direct:destination").log("Output at destination: '${body}'");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
static RoutesBuilder newExchangeRoute() throws Exception {
|
||||||
|
return new RouteBuilder() {
|
||||||
|
public void configure() throws Exception {
|
||||||
|
|
||||||
|
from("direct:source").wireTap("direct:tap").onPrepare(new MyPayloadClonePrepare()).end().delay(1000);
|
||||||
|
|
||||||
|
from("direct:tap").bean(MyBean.class, "addThree");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
public class MyBean {
|
||||||
|
|
||||||
|
public MyPayload addTwo(MyPayload body) {
|
||||||
|
body.setValue(body.getValue() + " and two");
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MyPayload addThree(MyPayload body) {
|
||||||
|
body.setValue(body.getValue() + " and three");
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class MyPayload implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
public MyPayload(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MyPayload deepClone() {
|
||||||
|
MyPayload myPayload = new MyPayload(value);
|
||||||
|
return myPayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.Processor;
|
||||||
|
|
||||||
|
public class MyPayloadClonePrepare implements Processor {
|
||||||
|
|
||||||
|
public void process(Exchange exchange) throws Exception {
|
||||||
|
MyPayload myPayload = exchange.getIn().getBody(MyPayload.class);
|
||||||
|
exchange.getIn().setBody(myPayload.deepClone());
|
||||||
|
exchange.getIn().setHeader("date", new Date());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
# to keep the JVM running
|
||||||
|
camel.springboot.main-run-controller = true
|
||||||
|
|
||||||
|
#configure the URL of the remote ActiveMQ broker
|
||||||
|
#camel.component.activemq.broker-url=tcp://localhost:61616
|
||||||
|
#spring.activemq.broker-url=tcp://localhost:61616
|
||||||
|
|
||||||
|
spring.activemq.in-memory=true
|
||||||
|
spring.activemq.pool.enabled=false
|
|
@ -0,0 +1,16 @@
|
||||||
|
# Root logger option
|
||||||
|
log4j.rootLogger=INFO, file, console
|
||||||
|
|
||||||
|
log4j.logger.com.javarticles=INFO, file
|
||||||
|
|
||||||
|
# Direct log messages to a log file
|
||||||
|
log4j.appender.file=org.apache.log4j.FileAppender
|
||||||
|
log4j.appender.file.File=javarticles.log
|
||||||
|
log4j.appender.file.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.file.layout.ConversionPattern=%d | %p | %F %L | %m%n
|
||||||
|
|
||||||
|
# Direct log messages to stdout
|
||||||
|
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.console.Target=System.out
|
||||||
|
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.console.layout.ConversionPattern=%d{HH:mm}| %p | %F %L | %m%n
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 8
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 9
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 0
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 1
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 2
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 3
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 4
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 5
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 6
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 7
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 8
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 9
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 0
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 1
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 2
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 3
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 4
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 5
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 6
|
|
@ -0,0 +1 @@
|
||||||
|
Test Message: 7
|
|
@ -28,6 +28,7 @@
|
||||||
<module>intercepting-filter</module>
|
<module>intercepting-filter</module>
|
||||||
<module>solid</module>
|
<module>solid</module>
|
||||||
<module>clean-architecture</module>
|
<module>clean-architecture</module>
|
||||||
|
<module>enterprise-patterns</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
|
|
Loading…
Reference in New Issue