ARTEMIS-3686: Adding example showing how to do telemetry support
This is adding an example intercepting and sending messages using opentelemetry to either Jaeger, zipkin or OTLP exporter
This commit is contained in:
parent
7effa488c2
commit
7bee0d8923
|
@ -801,4 +801,10 @@ public interface Message {
|
|||
default String getStringBody() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Used for user context data. Useful on interceptors. */
|
||||
Object getUserContext(Object key);
|
||||
|
||||
/** Used for user context data. Useful on interceptors. */
|
||||
void setUserContext(Object key, Object value);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
// import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -- #ifdef DEBUG
|
||||
|
@ -27,6 +28,8 @@ public class RefCountMessage {
|
|||
private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "refCount");
|
||||
private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_USAGE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "usageCount");
|
||||
|
||||
private volatile HashMap userContext;
|
||||
|
||||
private volatile int durableRefCount = 0;
|
||||
|
||||
private volatile int refCount = 0;
|
||||
|
@ -170,4 +173,19 @@ public class RefCountMessage {
|
|||
return count;
|
||||
}
|
||||
|
||||
public Object getUserContext(Object key) {
|
||||
if (userContext == null) {
|
||||
return null;
|
||||
} else {
|
||||
return userContext.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
public void setUserContext(Object key, Object value) {
|
||||
if (userContext == null) {
|
||||
userContext = new HashMap();
|
||||
}
|
||||
userContext.put(key, value);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -723,4 +723,13 @@ public class MessageInternalImpl implements MessageInternal {
|
|||
message.setOwner(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUserContext(Object key) {
|
||||
return message.getUserContext(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUserContext(Object key, Object value) {
|
||||
message.setUserContext(key, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -516,4 +516,14 @@ public class OpenwireMessage implements Message {
|
|||
public int usageDown() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUserContext(Object key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUserContext(Object key, Object value) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -296,6 +296,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
class FakeMessage implements Message {
|
||||
|
||||
@Override
|
||||
public Object getUserContext(Object key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUserContext(Object key, Object value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
// should normally not be visible in GUI
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>jms-examples</artifactId>
|
||||
<version>2.21.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>opentracing</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis Broker Plugin Example</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client-all</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-amqp-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
<version>1.10.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-semconv</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-exporter-logging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-exporter-otlp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-exporter-jaeger</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-exporter-zipkin</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk-testing</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/org.assertj/assertj-core -->
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>3.22.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create</id>
|
||||
<phase>verify</phase>
|
||||
<configuration>
|
||||
<!-- The broker plugin will install this library on the server's classpath -->
|
||||
<libList>
|
||||
<arg>org.apache.activemq.examples.broker:opentracing:${project.version}</arg>
|
||||
</libList>
|
||||
<libListWithDeps>
|
||||
<arg>io.opentelemetry:opentelemetry-api:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-sdk:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-semconv:${opentelemetry-alpha.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:${opentelemetry-alpha.version}
|
||||
</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-exporter-logging:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-exporter-otlp:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-exporter-jaeger:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-exporter-zipkin:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-sdk:${opentelemetry.version}</arg>
|
||||
<arg>io.opentelemetry:opentelemetry-sdk-testing:${opentelemetry.version}</arg>
|
||||
<arg>junit:junit:${junit.version}</arg>
|
||||
<arg>org.assertj:assertj-core:3.22.0</arg>
|
||||
<arg>org.mockito:mockito-core:${mockito.version}</arg>
|
||||
</libListWithDeps>
|
||||
<ignore>${noServer}</ignore>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>start</id>
|
||||
<goals>
|
||||
<goal>cli</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<testURI>tcp://localhost:61616</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>runClient</id>
|
||||
<goals>
|
||||
<goal>runClient</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<clientClass>org.apache.activemq.artemis.jms.example.OpenTracingPluginExample</clientClass>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>stop</id>
|
||||
<goals>
|
||||
<goal>cli</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<ignore>${noServer}</ignore>
|
||||
<args>
|
||||
<param>stop</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>opentracing</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-clean-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,68 @@
|
|||
# Opentracing Plugin Example
|
||||
|
||||
This plugin
|
||||
embraces [OpenTelemetry Autoconfiguration](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure)
|
||||
using environment-based properties to configure OpenTelemetry SDK.
|
||||
|
||||
## Run Opentracing Plugin Example
|
||||
|
||||
[![Running the Example Demo](https://img.youtube.com/vi/MVGx7QrztZQ/0.jpg)](https://www.youtube.com/watch?v=MVGx7QrztZQ)
|
||||
|
||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start
|
||||
and create the broker manually.
|
||||
> **_NOTE:_** You must have [jeager](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure#jaeger-exporter) running at `http://localhost:16686`. You can learn more about Jeager [here](https://www.jaegertracing.io/)
|
||||
|
||||
> command to start your jeager instance `docker run -p 16686:16686 -p 14250:14250 jaegertracing/all-in-one:<your_version>`
|
||||
|
||||
After seeing a **`Build Success`**, open the browser, connect to your Jeager running instance and check for spans.
|
||||
|
||||
## Customise Opentracing Plugin Example
|
||||
|
||||
The [`tracing.properties`](./src/main/resources/tracing.properties) has configuration properties that
|
||||
autoconfigure [Opentelemetry Exporter](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure#exporters)
|
||||
. We reconfigured it and used Jeager as the default exporter, sending data through at `http://localhost:14250`
|
||||
You can change this by choosing to use:
|
||||
|
||||
- [otlp exporter](https://github.com/open-telemetry/opentelemetry-java/tree/1e073fcff20697fd5f2eb39bd6246d06a1231089/sdk-extensions/autoconfigure#otlp-exporter-both-span-and-metric-exporters)
|
||||
, by uncommenting (removing `#`) the following
|
||||
- otlp enabler: `otel.traces.exporter=otlp`
|
||||
- otlp endpoint: `otel.exporter.otlp.endpoint=http://localhost:4317` Change port and host to match your running
|
||||
instance.
|
||||
- otlp traces-endpoint: `otel.exporter.otlp.traces.endpoint=http://localhost:4317` Change port and host to match
|
||||
your running instance.
|
||||
|
||||
|
||||
- [Zipkin Exporter](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure#zipkin-exporter)
|
||||
, by uncommenting (removing `#`) the following
|
||||
- Zipkin enabler: `otel.traces.exporter=zipkin`
|
||||
- Zipkin endpoint: `otel.exporter.zipkin.endpoint=http://localhost:9411/api/v2/spans`. Change port and host to match your
|
||||
running instance.
|
||||
> **Note:** command to start Zipkin instance `docker run -p 9411:9411 openzipkin/zipkin`
|
||||
|
||||
|
||||
You can also change the default service name from `opentracing_plugin` to any string by changing the value
|
||||
of `otel.service.name`
|
||||
|
||||
## How to start exporters
|
||||
- [Zipkin](https://zipkin.io/pages/quickstart): The quickest way is by use of docker.
|
||||
- Open the terminal, copy, paste and run the command `docker run -d -p 9411:9411 openzipkin/zipkin`
|
||||
- open the browser, enter the url `http://localhost:9411` and on the page that appears, click the **Run Queries** button.
|
||||
|
||||
|
||||
- [Jeager](https://www.jaegertracing.io/docs/1.30/getting-started/): The quickest way is by use of docker.
|
||||
- open the terminal and paste the command below
|
||||
```
|
||||
docker run -d --name jaeger \
|
||||
e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
|
||||
p 5775:5775/udp \
|
||||
p 6831:6831/udp \
|
||||
p 6832:6832/udp \
|
||||
p 5778:5778 \
|
||||
p 16686:16686 \
|
||||
p 14250:14250 \
|
||||
p 14268:14268 \
|
||||
p 14269:14269 \
|
||||
p 9411:9411 \
|
||||
jaegertracing/all-in-one:1.30
|
||||
```
|
||||
- open the browser, enter the url `http://localhost:16686/search`, click **Search**, select your service-name from the dropdown below the service name and finally click **Find Traces** Button.
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk;
|
||||
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
public class OpenTracingPlugin implements ActiveMQServerPlugin {
|
||||
|
||||
private static final String OPERATION_NAME = "ArtemisMessageDelivery";
|
||||
private static OpenTelemetrySdk sdk = initOpenTracing();
|
||||
private static Tracer tracer = GlobalOpenTelemetry.getTracer(OpenTracingPlugin.class.getName());
|
||||
|
||||
public static OpenTelemetrySdk initOpenTracing() {
|
||||
try {
|
||||
InputStream input = OpenTracingPlugin.class.getClassLoader().getResourceAsStream("tracing.properties");
|
||||
if (input == null) {
|
||||
throw new NullPointerException("Unable to find tracing.properties file");
|
||||
}
|
||||
Properties prop = new Properties(System.getProperties());
|
||||
prop.load(input);
|
||||
System.setProperties(prop);
|
||||
|
||||
sdk = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
|
||||
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
return sdk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeSend(ServerSession session,
|
||||
Transaction tx,
|
||||
Message message,
|
||||
boolean direct,
|
||||
boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
SpanBuilder spanBuilder = getTracer().spanBuilder(OPERATION_NAME).setAttribute("message", message.toString()).setSpanKind(SpanKind.SERVER);
|
||||
Span span = spanBuilder.startSpan();
|
||||
message.setUserContext(Span.class, span);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSend(Transaction tx,
|
||||
Message message,
|
||||
boolean direct,
|
||||
boolean noAutoCreateQueue,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
Span span = getSpan(message);
|
||||
span.addEvent("send " + result.name());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
Span span = (Span) reference.getMessage().getUserContext(Span.class);
|
||||
span.addEvent("deliver " + consumer.getSessionName());
|
||||
span.end();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSendException(ServerSession session,
|
||||
Transaction tx,
|
||||
Message message,
|
||||
boolean direct,
|
||||
boolean noAutoCreateQueue,
|
||||
Exception e) throws ActiveMQException {
|
||||
getSpan(message).setStatus(StatusCode.ERROR).recordException(e);
|
||||
}
|
||||
|
||||
public Tracer getTracer() {
|
||||
return tracer;
|
||||
}
|
||||
|
||||
public void setTracer(Tracer myTracer) {
|
||||
tracer = myTracer;
|
||||
}
|
||||
|
||||
private Span getSpan(Message message) {
|
||||
Span span = (Span) message.getUserContext(Span.class);
|
||||
return span;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
|
||||
/**
|
||||
* A simple example which shows how to use a QueueBrowser to look at messages of a queue without removing them from the queue
|
||||
*/
|
||||
public class OpenTracingPluginExample {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
// This example will send and receive an AMQP message
|
||||
sendConsumeAMQP();
|
||||
|
||||
// And it will also send and receive a Core message
|
||||
sendConsumeCore();
|
||||
}
|
||||
|
||||
private static void sendConsumeAMQP() throws JMSException {
|
||||
Connection connection = null;
|
||||
ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");
|
||||
|
||||
try {
|
||||
|
||||
// Create an amqp qpid 1.0 connection
|
||||
connection = connectionFactory.createConnection();
|
||||
|
||||
// Create a session
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// Create a sender
|
||||
Queue queue = session.createQueue("exampleQueue");
|
||||
MessageProducer sender = session.createProducer(queue);
|
||||
|
||||
// send a few simple message
|
||||
sender.send(session.createTextMessage("Hello world "));
|
||||
|
||||
connection.start();
|
||||
|
||||
// create a moving receiver, this means the message will be removed from the queue
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
// receive the simple message
|
||||
consumer.receive(5000);
|
||||
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
// close the connection
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void sendConsumeCore() throws JMSException {
|
||||
Connection connection = null;
|
||||
try {
|
||||
// Perform a lookup on the Connection Factory
|
||||
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
|
||||
Queue queue = new ActiveMQQueue("exampleQueue");
|
||||
|
||||
// Create a JMS Connection
|
||||
connection = cf.createConnection();
|
||||
|
||||
// Create a JMS Session
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// Create a JMS Message Producer
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
// Create a Text Message
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
|
||||
// Send the Message
|
||||
producer.send(message);
|
||||
|
||||
// Create a JMS Message Consumer
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
||||
// Start the Connection
|
||||
connection.start();
|
||||
|
||||
// Receive the message
|
||||
messageConsumer.receive(5000);
|
||||
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,199 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit.
|
||||
This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size>
|
||||
|
||||
-->
|
||||
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.jms.example.OpenTracingPlugin"/>
|
||||
</broker-plugins>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="guest"/>
|
||||
<permission type="deleteNonDurableQueue" roles="guest"/>
|
||||
<permission type="createDurableQueue" roles="guest"/>
|
||||
<permission type="deleteDurableQueue" roles="guest"/>
|
||||
<permission type="createAddress" roles="guest"/>
|
||||
<permission type="deleteAddress" roles="guest"/>
|
||||
<permission type="consume" roles="guest"/>
|
||||
<permission type="browse" roles="guest"/>
|
||||
<permission type="send" roles="guest"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="guest"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,37 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#activemq
|
||||
|
||||
# using otlp exporter
|
||||
#otel.traces.exporter=otlp
|
||||
#otel.exporter.otlp.endpoint=http://localhost:4317
|
||||
#otel.exporter.otlp.traces.endpoint=http://localhost:4317
|
||||
#otel.exporter.otlp.timeout=10000
|
||||
#otel.exporter.otlp.traces.timeout=10000
|
||||
#otel.exporter.otlp.protocol=grpc
|
||||
#otel.exporter.otlp.traces.protocol=grpc
|
||||
|
||||
# service name
|
||||
otel.service.name=artemis_opentracing
|
||||
|
||||
# Jaeger exporter
|
||||
otel.traces.exporter=jaeger
|
||||
otel.exporter.jaeger.endpoint=http://localhost:14250
|
||||
otel.exporter.jaeger.timeout=10000
|
||||
|
||||
# Zipkin exporter (default)
|
||||
#otel.traces.exporter=zipkin
|
||||
#otel.exporter.zipkin.endpoint=http://localhost:9411/api/v2/spans
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyObject;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.validateMockitoUsage;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class OpenTracingPluginTest {
|
||||
|
||||
private final InMemorySpanExporter exporter = InMemorySpanExporter.create();
|
||||
@InjectMocks
|
||||
private OpenTracingPlugin plugin;
|
||||
|
||||
@Mock
|
||||
private SpanBuilder spanBuilder;
|
||||
|
||||
@Mock
|
||||
private Transaction tx;
|
||||
|
||||
@Mock
|
||||
private Message message;
|
||||
|
||||
@Mock
|
||||
private Span span;
|
||||
|
||||
@Test
|
||||
public void assertBeforeMessage() throws JMSException, ActiveMQException {
|
||||
Tracer tracer = mock(Tracer.class);
|
||||
ServerSession session = mock(ServerSession.class);
|
||||
|
||||
Mockito.doReturn(spanBuilder).when(tracer).spanBuilder(anyString());
|
||||
Mockito.doReturn(spanBuilder).when(spanBuilder).setAttribute(anyString(), anyString());
|
||||
Mockito.doReturn(spanBuilder).when(spanBuilder).setSpanKind(any(SpanKind.class));
|
||||
Mockito.doReturn(span).when(spanBuilder).startSpan();
|
||||
|
||||
plugin.setTracer(tracer);
|
||||
plugin.beforeSend(session, tx, message, true, true);
|
||||
|
||||
verify(tracer, atLeastOnce()).spanBuilder(anyString());
|
||||
verify(spanBuilder, atLeastOnce()).setAttribute(anyString(), anyString());
|
||||
verify(spanBuilder, atLeastOnce()).setSpanKind(any(SpanKind.class));
|
||||
verify(spanBuilder, atLeastOnce()).startSpan();
|
||||
verify(message, atLeastOnce()).setUserContext(anyObject(), anyObject());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void assetAfterSend() throws ActiveMQException {
|
||||
when(message.getUserContext(Span.class)).thenReturn(span);
|
||||
|
||||
plugin.afterSend(tx, message, true, true, RoutingStatus.OK);
|
||||
verify(span, atLeastOnce()).addEvent(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void assertOnSendException() throws ActiveMQException {
|
||||
ServerSession session = mock(ServerSession.class);
|
||||
Exception exception = mock(Exception.class);
|
||||
|
||||
when(message.getUserContext(Span.class)).thenReturn(span);
|
||||
when(span.setStatus(any(StatusCode.class))).thenReturn(span);
|
||||
when(span.recordException(any(Exception.class))).thenReturn(span);
|
||||
|
||||
plugin.onSendException(session, tx, message, true, true, exception);
|
||||
|
||||
verify(span, atLeastOnce()).setStatus(any(StatusCode.class));
|
||||
verify(span, atLeastOnce()).setStatus(any(StatusCode.class));
|
||||
verify(span, atLeastOnce()).recordException(any(Exception.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void assertAfterDeliver() throws ActiveMQException {
|
||||
ServerConsumer consumer = mock(ServerConsumer.class);
|
||||
MessageReference reference = mock(MessageReference.class);
|
||||
|
||||
when(reference.getMessage().getUserContext(Span.class)).thenReturn(span);
|
||||
|
||||
plugin.afterDeliver(consumer, reference);
|
||||
|
||||
verify(span, atLeastOnce()).addEvent(anyString());
|
||||
verify(span, atLeastOnce()).end();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
exporter.reset();
|
||||
GlobalOpenTelemetry.resetForTest();
|
||||
validateMockitoUsage();
|
||||
}
|
||||
}
|
|
@ -80,6 +80,7 @@ under the License.
|
|||
<module>message-priority</module>
|
||||
<module>netty-openssl</module>
|
||||
<module>no-consumer-buffering</module>
|
||||
<module>opentracing</module>
|
||||
<module>paging</module>
|
||||
<module>pre-acknowledge</module>
|
||||
<module>producer-rate-limit</module>
|
||||
|
@ -156,6 +157,7 @@ under the License.
|
|||
<module>message-priority</module>
|
||||
<module>netty-openssl</module>
|
||||
<module>no-consumer-buffering</module>
|
||||
<module>opentracing</module>
|
||||
<module>paging</module>
|
||||
<module>pre-acknowledge</module>
|
||||
<module>producer-rate-limit</module>
|
||||
|
|
21
pom.xml
21
pom.xml
|
@ -118,6 +118,10 @@
|
|||
<curator.version>5.2.0</curator.version>
|
||||
<zookeeper.version>3.6.3</zookeeper.version>
|
||||
|
||||
<!-- tracing -->
|
||||
<opentelemetry.version>1.10.0</opentelemetry.version>
|
||||
<opentelemetry-alpha.version>1.10.0-alpha</opentelemetry-alpha.version>
|
||||
|
||||
<!-- this is basically for tests -->
|
||||
<netty-tcnative-version>2.0.48.Final</netty-tcnative-version>
|
||||
<proton.version>0.33.10</proton.version>
|
||||
|
@ -549,6 +553,23 @@
|
|||
</dependency>
|
||||
<!-- ## End Examples Dependencies ## -->
|
||||
|
||||
<!-- ### START opentelemetry Dependencies ### -->
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-bom</artifactId>
|
||||
<version>${opentelemetry.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-bom-alpha</artifactId>
|
||||
<version>${opentelemetry-alpha.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<!-- ### END opentelemetry Dependencies ### -->
|
||||
|
||||
<!--needed to compile the bootstrap jar-->
|
||||
<dependency>
|
||||
<groupId>org.jgroups</groupId>
|
||||
|
|
|
@ -844,5 +844,15 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
|||
@Override
|
||||
public void setOwner(Object object) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUserContext(Object key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUserContext(Object key, Object value) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue