Improving into Stomp Embedded Interceptor example
Using the regular maven plugin framework, and adding the interceptors to the regular broker.xml configuration
This commit is contained in:
parent
6496be50c7
commit
8bb3d1ceaa
|
@ -45,6 +45,7 @@ under the License.
|
||||||
<modules>
|
<modules>
|
||||||
<module>stomp</module>
|
<module>stomp</module>
|
||||||
<module>stomp-websockets</module>
|
<module>stomp-websockets</module>
|
||||||
|
<module>stomp-embedded-interceptor</module>
|
||||||
<module>stomp1.1</module>
|
<module>stomp1.1</module>
|
||||||
<module>stomp1.2</module>
|
<module>stomp1.2</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
@ -53,6 +54,7 @@ under the License.
|
||||||
<id>examples</id>
|
<id>examples</id>
|
||||||
<modules>
|
<modules>
|
||||||
<module>stomp</module>
|
<module>stomp</module>
|
||||||
|
<module>stomp-embedded-interceptor</module>
|
||||||
|
|
||||||
<!-- has to be executed manually
|
<!-- has to be executed manually
|
||||||
<module>stomp-websockets</module> -->
|
<module>stomp-websockets</module> -->
|
||||||
|
|
|
@ -68,6 +68,32 @@ under the License.
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>artemis-maven-plugin</artifactId>
|
<artifactId>artemis-maven-plugin</artifactId>
|
||||||
<executions>
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>create</id>
|
||||||
|
<goals>
|
||||||
|
<goal>create</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<!-- need to install the jar for interceptors on the server -->
|
||||||
|
<libList><arg>org.apache.activemq.examples.stomp:stomp-embedded-interceptor:${project.version}</arg></libList>
|
||||||
|
<ignore>${noServer}</ignore>
|
||||||
|
<configuration>${basedir}/target/classes/activemq/server0</configuration>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>start</id>
|
||||||
|
<goals>
|
||||||
|
<goal>cli</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<ignore>${noServer}</ignore>
|
||||||
|
<spawn>true</spawn>
|
||||||
|
<testURI>tcp://localhost:61616</testURI>
|
||||||
|
<args>
|
||||||
|
<param>run</param>
|
||||||
|
</args>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
<execution>
|
<execution>
|
||||||
<id>runClient</id>
|
<id>runClient</id>
|
||||||
<goals>
|
<goals>
|
||||||
|
@ -77,6 +103,18 @@ under the License.
|
||||||
<clientClass>org.apache.activemq.artemis.jms.example.StompEmbeddedWithInterceptorExample</clientClass>
|
<clientClass>org.apache.activemq.artemis.jms.example.StompEmbeddedWithInterceptorExample</clientClass>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>stop</id>
|
||||||
|
<goals>
|
||||||
|
<goal>cli</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<ignore>${noServer}</ignore>
|
||||||
|
<args>
|
||||||
|
<param>stop</param>
|
||||||
|
</args>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.jms.example;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
|
||||||
|
public class MyStompInterceptor implements StompFrameInterceptor {
|
||||||
|
|
||||||
|
public boolean intercept(StompFrame frame, RemotingConnection remotingConnection)
|
||||||
|
throws ActiveMQException {
|
||||||
|
|
||||||
|
StompConnection connection = (StompConnection) remotingConnection;
|
||||||
|
|
||||||
|
frame.addHeader("stompIntercepted", "Hello");
|
||||||
|
|
||||||
|
System.out.println("MyStompInterceptor Intercepted frame " + frame + " on connection " + connection);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.jms.example;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
|
||||||
|
public class RegularInterceptor implements Interceptor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||||
|
System.out.println("Regular Intercept on " + packet);
|
||||||
|
if (packet instanceof MessagePacket) {
|
||||||
|
MessagePacket messagePacket = (MessagePacket)packet;
|
||||||
|
messagePacket.getMessage().putStringProperty("regularIntercepted", "HelloAgain");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,67 +16,24 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.example;
|
package org.apache.activemq.artemis.jms.example;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
|
||||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
|
|
||||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
|
|
||||||
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
|
|
||||||
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
|
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
|
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
|
||||||
import javax.jms.Queue;
|
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Date;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import java.util.List;
|
|
||||||
import java.util.HashSet;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This example demonstrates how to run an ActiveMQ Artemis embedded with JMS
|
* This example demonstrates how to run an ActiveMQ Artemis embedded with JMS
|
||||||
*/
|
*/
|
||||||
public class StompEmbeddedWithInterceptorExample {
|
public class StompEmbeddedWithInterceptorExample {
|
||||||
|
|
||||||
public static class MyStompInterceptor
|
|
||||||
implements StompFrameInterceptor {
|
|
||||||
|
|
||||||
public boolean intercept(StompFrame frame, RemotingConnection remotingConnection)
|
|
||||||
throws ActiveMQException {
|
|
||||||
|
|
||||||
StompConnection connection = (StompConnection) remotingConnection;
|
|
||||||
|
|
||||||
System.out.println("Intercepted frame " + frame + " on connection " + connection);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final String END_OF_FRAME = "\u0000";
|
private static final String END_OF_FRAME = "\u0000";
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
EmbeddedJMS jmsServer = new EmbeddedJMS();
|
|
||||||
String brokerConfigPath = StompEmbeddedWithInterceptorExample.class.getResource("/broker.xml").toString();
|
|
||||||
jmsServer.setConfigResourcePath(brokerConfigPath);
|
|
||||||
|
|
||||||
jmsServer.start();
|
|
||||||
jmsServer.getActiveMQServer().getRemotingService().addIncomingInterceptor(new MyStompInterceptor());
|
|
||||||
|
|
||||||
System.out.println("Started Embedded JMS Server");
|
|
||||||
|
|
||||||
// Step 1. Create a TCP socket to connect to the Stomp port
|
// Step 1. Create a TCP socket to connect to the Stomp port
|
||||||
Socket socket = new Socket("localhost", 61616);
|
Socket socket = new Socket("localhost", 61616);
|
||||||
|
|
||||||
|
@ -95,7 +52,7 @@ public class StompEmbeddedWithInterceptorExample {
|
||||||
// jms.queue.exampleQueue address with a text body
|
// jms.queue.exampleQueue address with a text body
|
||||||
String text = "Hello World from Stomp 1.2 !";
|
String text = "Hello World from Stomp 1.2 !";
|
||||||
String message = "SEND\n" +
|
String message = "SEND\n" +
|
||||||
"destination:jms.queue.exampleQueue\n" +
|
"destination:jms.queue.exampleQueue" +
|
||||||
"\n" +
|
"\n" +
|
||||||
text +
|
text +
|
||||||
END_OF_FRAME;
|
END_OF_FRAME;
|
||||||
|
@ -111,8 +68,28 @@ public class StompEmbeddedWithInterceptorExample {
|
||||||
// Step 5. Slose the TCP socket
|
// Step 5. Slose the TCP socket
|
||||||
socket.close();
|
socket.close();
|
||||||
|
|
||||||
Thread.sleep(1000);
|
|
||||||
jmsServer.stop();
|
// It will use a regular JMS connection to show how the injected data will appear at the final message
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(session.createQueue("exampleQueue"));
|
||||||
|
Message messageReceived = consumer.receive(5000);
|
||||||
|
|
||||||
|
String propStomp = messageReceived.getStringProperty("stompIntercepted");
|
||||||
|
|
||||||
|
String propRegular = messageReceived.getStringProperty("regularIntercepted");
|
||||||
|
|
||||||
|
System.out.println("propStomp is Hello!! - " + propStomp.equals("Hello"));
|
||||||
|
System.out.println("propRegular is HelloAgain!! - " + propRegular.equals("HelloAgain"));
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
factory.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void sendFrame(Socket socket, String data) throws Exception {
|
private static void sendFrame(Socket socket, String data) throws Exception {
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
guest=guest
|
|
@ -0,0 +1,17 @@
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
## contributor license agreements. See the NOTICE file distributed with
|
||||||
|
## this work for additional information regarding copyright ownership.
|
||||||
|
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
## (the "License"); you may not use this file except in compliance with
|
||||||
|
## the License. You may obtain a copy of the License at
|
||||||
|
##
|
||||||
|
## http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
##
|
||||||
|
## Unless required by applicable law or agreed to in writing, software
|
||||||
|
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
## See the License for the specific language governing permissions and
|
||||||
|
## limitations under the License.
|
||||||
|
## ---------------------------------------------------------------------------
|
||||||
|
guest=guest
|
|
@ -22,12 +22,40 @@ under the License.
|
||||||
xmlns="urn:activemq"
|
xmlns="urn:activemq"
|
||||||
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
|
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
|
||||||
|
|
||||||
|
<jms xmlns="urn:activemq:jms">
|
||||||
|
<!--the queue used by the example-->
|
||||||
|
<queue name="exampleQueue"/>
|
||||||
|
</jms>
|
||||||
|
|
||||||
<core xmlns="urn:activemq:core">
|
<core xmlns="urn:activemq:core">
|
||||||
|
|
||||||
<persistence-enabled>false</persistence-enabled>
|
<bindings-directory>./data/bindings</bindings-directory>
|
||||||
|
|
||||||
|
<journal-directory>./data/journal</journal-directory>
|
||||||
|
|
||||||
|
<large-messages-directory>./data/largemessages</large-messages-directory>
|
||||||
|
|
||||||
|
<paging-directory>./data/paging</paging-directory>
|
||||||
|
|
||||||
|
<queues>
|
||||||
|
<queue name="jms">
|
||||||
|
<address></address>
|
||||||
|
</queue>
|
||||||
|
</queues>
|
||||||
|
|
||||||
|
|
||||||
|
<remoting-incoming-interceptors>
|
||||||
|
<class-name>org.apache.activemq.artemis.jms.example.MyStompInterceptor</class-name>
|
||||||
|
<class-name>org.apache.activemq.artemis.jms.example.RegularInterceptor</class-name>
|
||||||
|
</remoting-incoming-interceptors>
|
||||||
|
|
||||||
|
<remoting-outgoing-interceptors>
|
||||||
|
<class-name>org.apache.activemq.artemis.jms.example.RegularInterceptor</class-name>
|
||||||
|
</remoting-outgoing-interceptors>
|
||||||
|
|
||||||
|
<!-- Acceptors -->
|
||||||
<acceptors>
|
<acceptors>
|
||||||
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=STOMP</acceptor>
|
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||||
</acceptors>
|
</acceptors>
|
||||||
|
|
||||||
<!-- Other config -->
|
<!-- Other config -->
|
Loading…
Reference in New Issue