This closes #235 - on HornetQ protocol support

This closes #227 - This PR was reworked as #235 to fix a small pom issue (typo)
This commit is contained in:
Clebert Suconic 2015-05-07 15:33:55 -04:00
commit a394c831f9
17 changed files with 583 additions and 34 deletions

View File

@ -74,6 +74,8 @@ import org.apache.activemq.artemis.utils.VersionLoader;
public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
private static final String handshake = "ARTEMIS";
private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
private ClientSessionFactoryInternal factoryInternal;
@ -479,7 +481,6 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
if (transportConnection.isUsingProtocolHandling())
{
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
String handshake = "HORNETQ";
ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
amqbuffer.writeBytes(handshake.getBytes());
transportConnection.write(amqbuffer);

View File

@ -29,6 +29,8 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
{
private static final String AMQP_PROTOCOL_NAME = "AMQP";
private static final String MODULE_NAME = "artemis-amqp-protocol";
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override
@ -49,4 +51,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
{
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -0,0 +1,53 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>artemis-protocols</artifactId>
<groupId>org.apache.activemq</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-hornetq-protocol</artifactId>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!--
JBoss Logging
-->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
public class HQPropertiesConversionInterceptor implements Interceptor
{
private static Map<SimpleString, SimpleString> dictionary;
static
{
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
// Add entries for incoming messages
d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
dictionary = Collections.unmodifiableMap(d);
}
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
if (isMessagePacket(packet))
{
handleReceiveMessage((MessagePacket) packet);
}
return true;
}
private void handleReceiveMessage(MessagePacket messagePacket)
{
Message message = messagePacket.getMessage();
// We are modifying the key set so we iterate over a shallow copy.
for (SimpleString property : new HashSet<>(message.getPropertyNames()))
{
if (dictionary.containsKey(property))
{
message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
}
}
}
private boolean isMessagePacket(Packet packet)
{
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_CONTINUATION ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* HornetQ Protocol Manager
*/
class HornetQProtocolManager extends CoreProtocolManager
{
HornetQProtocolManager(CoreProtocolManagerFactory factory, ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
super(factory, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
{
//if we are not an old client then handshake
if (buffer.getByte(0) == 'H' &&
buffer.getByte(1) == 'O' &&
buffer.getByte(2) == 'R' &&
buffer.getByte(3) == 'N' &&
buffer.getByte(4) == 'E' &&
buffer.getByte(5) == 'T' &&
buffer.getByte(6) == 'Q')
{
//todo add some handshaking
buffer.readBytes(7);
}
}
@Override
public boolean isProtocol(byte[] array)
{
String frameStart = new String(array, StandardCharsets.US_ASCII);
return frameStart.startsWith("HORNETQ");
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import java.util.List;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory
{
public static final String HORNETQ_PROTOCOL_NAME = "HORNETQ";
private static final String MODULE_NAME = "artemis-hornetq-protocol";
private static String[] SUPPORTED_PROTOCOLS = {HORNETQ_PROTOCOL_NAME};
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
incomingInterceptors.add(propertyConversionInterceptor);
outgoingInterceptors.add(propertyConversionInterceptor);
return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public String[] getProtocols()
{
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -0,0 +1 @@
org.apache.activemq.artemis.core.protocol.hornetq.HornetQProtocolManagerFactory

View File

@ -29,6 +29,8 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
{
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
private static final String MODULE_NAME = "artemis-openwire-protocol";
private static String[] SUPPORTED_PROTOCOLS = {OPENWIRE_PROTOCOL_NAME};
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
@ -48,4 +50,9 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -27,6 +27,8 @@ public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<
{
public static final String STOMP_PROTOCOL_NAME = "STOMP";
private static final String MODULE_NAME = "artemis-stomp-protocol";
private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME};
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<StompFrameInterceptor> incomingInterceptors, List<StompFrameInterceptor> outgoingInterceptors)
@ -46,4 +48,10 @@ public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -35,6 +35,7 @@
<module>artemis-stomp-protocol</module>
<module>artemis-openwire-protocol</module>
<module>artemis-proton-plug</module>
<module>artemis-hornetq-protocol</module>
</modules>
</project>

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
class CoreProtocolManager implements ProtocolManager<Interceptor>
public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -71,7 +71,7 @@ class CoreProtocolManager implements ProtocolManager<Interceptor>
private final CoreProtocolManagerFactory protocolManagerFactory;
CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
public CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
this.protocolManagerFactory = factory;
@ -185,13 +185,13 @@ class CoreProtocolManager implements ProtocolManager<Interceptor>
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
{
//if we are not an old client then handshake
if (buffer.getByte(0) == 'H' &&
buffer.getByte(1) == 'O' &&
buffer.getByte(2) == 'R' &&
buffer.getByte(3) == 'N' &&
buffer.getByte(4) == 'E' &&
buffer.getByte(5) == 'T' &&
buffer.getByte(6) == 'Q')
if (buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' &&
buffer.getByte(2) == 'T' &&
buffer.getByte(3) == 'E' &&
buffer.getByte(4) == 'M' &&
buffer.getByte(5) == 'I' &&
buffer.getByte(6) == 'S')
{
//todo add some handshaking
buffer.readBytes(7);

View File

@ -29,6 +29,8 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
{
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
private static final String MODULE_NAME = "artemis-server";
/**
* {@inheritDoc} *
* @param server
@ -55,4 +57,11 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
{
return SUPPORTED_PROTOCOLS;
}
@Override
public String getModuleName()
{
return MODULE_NAME;
}
}

View File

@ -146,7 +146,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
//i know there is only 1
this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]);
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0],
coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors),
coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
@ -161,9 +161,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
String[] protocols = next.getProtocols();
for (String protocol : protocols)
{
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol);
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors),
next.filterInterceptors(outgoingInterceptors)));
next.filterInterceptors(outgoingInterceptors)));
}
}
}
@ -176,7 +176,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
String[] protocols = protocolManagerFactory.getProtocols();
for (String protocol : protocols)
{
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol);
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, protocolManagerFactory.getModuleName());
protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
}
}

View File

@ -265,8 +265,8 @@ public interface ActiveMQServerLogger extends BasicLogger
void timedOutWaitingCompletions(String bridgeName, long numberOfMessages);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221043, value = "Adding protocol support {0}", format = Message.Format.MESSAGE_FORMAT)
void addingProtocolSupport(String protocolKey);
@Message(id = 221043, value = "Protocol module found: [{1}]. Adding protocol support for: {0}", format = Message.Format.MESSAGE_FORMAT)
void addingProtocolSupport(String protocolKey, String moduleName);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221045, value = "libaio is not available, switching the configuration into NIO", format = Message.Format.MESSAGE_FORMAT)

View File

@ -42,4 +42,6 @@ public interface ProtocolManagerFactory<P extends BaseInterceptor>
List<P> filterInterceptors(List<BaseInterceptor> interceptors);
String[] getProtocols();
String getModuleName();
}

View File

@ -36,6 +36,8 @@
<tools.jar>${java.home}/../lib/tools.jar</tools.jar>
<byteman.version>2.2.0</byteman.version>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<jboss-jts.version>4.17.13.Final</jboss-jts.version>
<hornetq.version>2.4.7.Final</hornetq.version>
</properties>
<dependencies>
@ -179,19 +181,38 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- Needed for JMS Bridge Tests -->
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
<exclusions>
<exclusion>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-commons</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core-client</artifactId>
<version>${hornetq.version}</version>
</dependency>
<!-- Needed for XA tests -->
<dependency>
<groupId>org.jboss.jbossts.jts</groupId>
<artifactId>jbossjts-jacorb</artifactId>
<version>4.17.13.Final</version>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
</dependency>
</dependencies>
<build>
@ -211,7 +232,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<skipTests>${skipExtraTests}</skipTests>
<!-- ensure we don't inherit a byteman jar form any env settings -->
@ -219,14 +239,6 @@
<BYTEMAN_HOME></BYTEMAN_HOME>
</environmentVariables>
<systemProperties>
<property>
<name>com.arjuna.ats.arjuna.objectstore.objectStoreDir</name>
<value>target/ObjectStore</value>
</property>
<property>
<name>ObjectStoreEnvironmentBean.objectStoreDir</name>
<value>target/ObjectStore</value>
</property>
<!--
<property>
<name>org.jboss.byteman.home</name>
@ -248,9 +260,7 @@
</systemProperties>
<!-- make sure maven puts the byteman jar in the classpath rather than in a manifest jar -->
<useManifestOnlyJar>false</useManifestOnlyJar>
<!-- when upgrading this plugin from 2.4 to 2.18.1 <forkMode>once</forkMode> was replaced with these: -->
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<forkMode>once</forkMode>
<!--
<debugForkedProcess>true</debugForkedProcess>
-->

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ServiceTestBase;
import org.hornetq.api.core.client.HornetQClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class HornetQProtocolTest extends ServiceTestBase
{
protected ActiveMQServer server;
private static final Logger LOG = LoggerFactory.getLogger(HornetQProtocolTest.class);
@Before
public void setUp() throws Exception
{
startBroker();
}
@After
public void tearDown() throws Exception
{
stopBroker();
}
public void startBroker() throws Exception
{
super.setUp();
server = createServer(true, true);
addHornetQConnector();
server.start();
waitForServer(server);
}
public void stopBroker() throws Exception
{
if (server.isStarted())
{
server.stop();
server = null;
}
}
protected void addHornetQConnector() throws Exception
{
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, "" + 5445);
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PROTOCOLS_PROP_NAME, "HORNETQ");
TransportConfiguration transportConfig = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfig);
LOG.info("Added connector {} to broker", "HornetQ");
}
@Test
public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception
{
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
ClientSession coreSession = createCoreClientSession();
// Create Queue
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
// HornetQ Client Objects
hqSession.start();
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
// Core Client Objects
coreSession.start();
ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
// Check that HornetQ Properties are correctly converted to core properties.
for (int i = 0; i < 2; i++)
{
hqProducer.send(createHQTestMessage(hqSession));
}
ClientMessage coreMessage1 = coreConsumer.receive(1000);
assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
coreSession.close();
// Check that HornetQ Properties are correctly transformed from then to HornetQ properties
org.hornetq.api.core.client.ClientMessage hqMessage1 = hqConsumer.receive(1000);
assertTrue(hqMessage1.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
hqSession.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception
{
org.hornetq.api.core.client.ClientSession session = createHQClientSession();
String queueName = "test.hq.queue";
session.createQueue(queueName, queueName, true);
org.hornetq.api.core.client.ClientProducer producer = session.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer consumer = session.createConsumer(queueName);
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
String messageId = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
session.start();
producer.send(message);
org.hornetq.api.core.client.ClientMessage m = consumer.receive(1000);
assertTrue(m.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
assertNotNull(m);
producer.send(message);
m = consumer.receive(1000);
assertNull(m);
producer.send(message);
m = consumer.receive(1000);
assertNull(m);
session.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQAndCoreProtocol() throws Exception
{
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientMessage message = hqSession.createMessage(false);
String messageId = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
ClientSession coreSession = createCoreClientSession();
ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
hqSession.start();
coreSession.start();
hqProducer.send(message);
Message m = coreConsumer.receive(1000);
assertTrue(m.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
assertNotNull(m);
hqProducer.send(message);
m = coreConsumer.receive(1000);
assertNull(m);
hqProducer.send(message);
m = coreConsumer.receive(1000);
assertNull(m);
}
private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session)
{
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private ClientMessage createCoreTestMessage(ClientSession session)
{
ClientMessage message = session.createMessage(false);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private org.hornetq.api.core.client.ClientSession createHQClientSession() throws Exception
{
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 5445);
org.hornetq.api.core.client.ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(), map));
org.hornetq.api.core.client.ClientSessionFactory sf = serverLocator.createSessionFactory();
return sf.createSession();
}
private ClientSession createCoreClientSession() throws Exception
{
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 61616);
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
ClientSessionFactory sf = serverLocator.createSessionFactory();
return sf.createSession();
}
}