ACTIVEMQ6-100 Add support for HornetQ clients

This commit is contained in:
Martyn Taylor 2015-05-07 12:54:24 +01:00 committed by Clebert Suconic
parent f07af67632
commit 77efc950af
9 changed files with 526 additions and 20 deletions

View File

@ -0,0 +1,53 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>artemis-protocols</artifactId>
<groupId>org.apache.activemq</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-hornetq-protocol</artifactId>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!--
JBoss Logging
-->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
public class HQPropertiesConversionInterceptor implements Interceptor
{
private static Map<SimpleString, SimpleString> dictionary;
static
{
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
// Add entries for incoming messages
d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
dictionary = Collections.unmodifiableMap(d);
}
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
if (isMessagePacket(packet))
{
handleReceiveMessage((MessagePacket) packet);
}
return true;
}
private void handleReceiveMessage(MessagePacket messagePacket)
{
Message message = messagePacket.getMessage();
// We are modifying the key set so we iterate over a shallow copy.
for (SimpleString property : new HashSet<>(message.getPropertyNames()))
{
if (dictionary.containsKey(property))
{
message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
}
}
}
private boolean isMessagePacket(Packet packet)
{
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_CONTINUATION ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* HornetQ Protocol Manager
*/
class HornetQProtocolManager extends CoreProtocolManager
{
HornetQProtocolManager(CoreProtocolManagerFactory factory, ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
super(factory, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
{
//if we are not an old client then handshake
if (buffer.getByte(0) == 'H' &&
buffer.getByte(1) == 'O' &&
buffer.getByte(2) == 'R' &&
buffer.getByte(3) == 'N' &&
buffer.getByte(4) == 'E' &&
buffer.getByte(5) == 'T' &&
buffer.getByte(6) == 'Q')
{
//todo add some handshaking
buffer.readBytes(7);
}
}
@Override
public boolean isProtocol(byte[] array)
{
String frameStart = new String(array, StandardCharsets.US_ASCII);
return frameStart.startsWith("HORNETQ");
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import java.util.List;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory
{
public static final String HORNETQ_PROTOCOL_NAME = "HORNETQ";
private static String[] SUPPORTED_PROTOCOLS = {HORNETQ_PROTOCOL_NAME};
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
incomingInterceptors.add(propertyConversionInterceptor);
outgoingInterceptors.add(propertyConversionInterceptor);
return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public String[] getProtocols()
{
return SUPPORTED_PROTOCOLS;
}
}

View File

@ -0,0 +1 @@
org.apache.activemq.artemis.core.protocol.hornetq.HornetQProtocolManagerFactory

View File

@ -35,6 +35,7 @@
<module>artemis-stomp-protocol</module>
<module>artemis-openwire-protocol</module>
<module>artemis-proton-plug</module>
<module>artemis-hornetq-protocol</module>
</modules>
</project>

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
class CoreProtocolManager implements ProtocolManager<Interceptor>
public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -71,7 +71,7 @@ class CoreProtocolManager implements ProtocolManager<Interceptor>
private final CoreProtocolManagerFactory protocolManagerFactory;
CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
public CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
this.protocolManagerFactory = factory;

View File

@ -36,6 +36,8 @@
<tools.jar>${java.home}/../lib/tools.jar</tools.jar>
<byteman.version>2.2.0</byteman.version>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<jboss-jts.version>4.17.13.Final</jboss-jts.version>
<hornetq.version>2.4.7.Final</hornetq.version>
</properties>
<dependencies>
@ -179,19 +181,38 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- Needed for JMS Bridge Tests -->
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
<exclusions>
<exclusion>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-commons</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core-client</artifactId>
<version>${hornetq.version}</version>
</dependency>
<!-- Needed for XA tests -->
<dependency>
<groupId>org.jboss.jbossts.jts</groupId>
<artifactId>jbossjts-jacorb</artifactId>
<version>4.17.13.Final</version>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
</dependency>
</dependencies>
<build>
@ -211,7 +232,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<skipTests>${skipExtraTests}</skipTests>
<!-- ensure we don't inherit a byteman jar form any env settings -->
@ -219,14 +239,6 @@
<BYTEMAN_HOME></BYTEMAN_HOME>
</environmentVariables>
<systemProperties>
<property>
<name>com.arjuna.ats.arjuna.objectstore.objectStoreDir</name>
<value>target/ObjectStore</value>
</property>
<property>
<name>ObjectStoreEnvironmentBean.objectStoreDir</name>
<value>target/ObjectStore</value>
</property>
<!--
<property>
<name>org.jboss.byteman.home</name>
@ -248,9 +260,7 @@
</systemProperties>
<!-- make sure maven puts the byteman jar in the classpath rather than in a manifest jar -->
<useManifestOnlyJar>false</useManifestOnlyJar>
<!-- when upgrading this plugin from 2.4 to 2.18.1 <forkMode>once</forkMode> was replaced with these: -->
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<forkMode>once</forkMode>
<!--
<debugForkedProcess>true</debugForkedProcess>
-->

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ServiceTestBase;
import org.hornetq.api.core.client.HornetQClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class HornetQProtocolTest extends ServiceTestBase
{
protected ActiveMQServer server;
private static final Logger LOG = LoggerFactory.getLogger(HornetQProtocolTest.class);
@Before
public void setUp() throws Exception
{
startBroker();
}
@After
public void tearDown() throws Exception
{
stopBroker();
}
public void startBroker() throws Exception
{
super.setUp();
server = createServer(true, true);
addHornetQConnector();
server.start();
waitForServer(server);
}
public void stopBroker() throws Exception
{
if (server.isStarted())
{
server.stop();
server = null;
}
}
protected void addHornetQConnector() throws Exception
{
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, "" + 5445);
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PROTOCOLS_PROP_NAME, "HORNETQ");
TransportConfiguration transportConfig = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfig);
LOG.info("Added connector {} to broker", "HornetQ");
}
@Test
public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception
{
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
ClientSession coreSession = createCoreClientSession();
// Create Queue
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
// HornetQ Client Objects
hqSession.start();
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
// Core Client Objects
coreSession.start();
ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
// Check that HornetQ Properties are correctly converted to core properties.
for (int i = 0; i < 2; i++)
{
hqProducer.send(createHQTestMessage(hqSession));
}
ClientMessage coreMessage1 = coreConsumer.receive(1000);
assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
coreSession.close();
// Check that HornetQ Properties are correctly transformed from then to HornetQ properties
org.hornetq.api.core.client.ClientMessage hqMessage1 = hqConsumer.receive(1000);
assertTrue(hqMessage1.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
hqSession.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception
{
org.hornetq.api.core.client.ClientSession session = createHQClientSession();
String queueName = "test.hq.queue";
session.createQueue(queueName, queueName, true);
org.hornetq.api.core.client.ClientProducer producer = session.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer consumer = session.createConsumer(queueName);
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
String messageId = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
session.start();
producer.send(message);
org.hornetq.api.core.client.ClientMessage m = consumer.receive(1000);
assertTrue(m.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
assertNotNull(m);
producer.send(message);
m = consumer.receive(1000);
assertNull(m);
producer.send(message);
m = consumer.receive(1000);
assertNull(m);
session.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQAndCoreProtocol() throws Exception
{
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientMessage message = hqSession.createMessage(false);
String messageId = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
ClientSession coreSession = createCoreClientSession();
ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
hqSession.start();
coreSession.start();
hqProducer.send(message);
Message m = coreConsumer.receive(1000);
assertTrue(m.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
assertNotNull(m);
hqProducer.send(message);
m = coreConsumer.receive(1000);
assertNull(m);
hqProducer.send(message);
m = coreConsumer.receive(1000);
assertNull(m);
}
private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session)
{
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private ClientMessage createCoreTestMessage(ClientSession session)
{
ClientMessage message = session.createMessage(false);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private org.hornetq.api.core.client.ClientSession createHQClientSession() throws Exception
{
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 5445);
org.hornetq.api.core.client.ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(), map));
org.hornetq.api.core.client.ClientSessionFactory sf = serverLocator.createSessionFactory();
return sf.createSession();
}
private ClientSession createCoreClientSession() throws Exception
{
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 61616);
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
ClientSessionFactory sf = serverLocator.createSessionFactory();
return sf.createSession();
}
}