ARTEMIS-238 and ARTEMIS-236 Fixing Legacy protocol support

This commit is contained in:
Clebert Suconic 2015-10-07 22:37:34 -04:00
parent 1c067a5b96
commit 206acdac7d
55 changed files with 1283 additions and 153 deletions

View File

@ -769,6 +769,20 @@ public interface ServerLocator extends AutoCloseable {
ClientProtocolManagerFactory getProtocolManagerFactory();
void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
ServerLocator setIncomingInterceptorList(String interceptorList);
String getIncomingInterceptorList();
ServerLocator setOutgoingInterceptorList(String interceptorList);
String getOutgoingInterceptorList();
}

View File

@ -80,7 +80,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static final long serialVersionUID = -1615857864410205260L;
// This is the default value
private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
private final boolean ha;
@ -201,12 +201,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration clusterTransportConfiguration;
/*
* *************WARNING***************
* remember that when adding any new classes that we have to support serialization with previous clients.
* If you need to, make them transient and handle the serialization yourself
* */
private final Exception traceException = new Exception();
// To be called when there are ServerLocator being finalized.
@ -619,14 +613,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientProtocolManagerFactory getProtocolManagerFactory() {
if (protocolManagerFactory == null) {
// this could happen over serialization from older versions
protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
// Default one in case it's null
protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
}
return protocolManagerFactory;
}
public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
this.protocolManagerFactory = protocolManagerFactory;
protocolManagerFactory.setLocator(this);
return this;
}
public void disableFinalizeCheck() {
@ -860,10 +856,41 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory;
}
@Override
public boolean isHA() {
return ha;
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
@Override
public ServerLocator setIncomingInterceptorList(String interceptorList) {
feedInterceptors(incomingInterceptors, interceptorList);
return this;
}
@Override
public String getIncomingInterceptorList() {
return fromInterceptors(incomingInterceptors);
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
@Override
public ServerLocator setOutgoingInterceptorList(String interceptorList) {
feedInterceptors(outgoingInterceptors, interceptorList);
return this;
}
@Override
public String getOutgoingInterceptorList() {
return fromInterceptors(outgoingInterceptors);
}
public boolean isCacheLargeMessagesClient() {
return cacheLargeMessagesClient;
}
@ -1775,4 +1802,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public boolean isReceivedToplogy() {
return receivedTopology;
}
private String fromInterceptors(final List<Interceptor> interceptors) {
StringBuffer buffer = new StringBuffer();
boolean first = true;
for (Interceptor value : interceptors) {
if (!first) {
buffer.append(",");
}
first = false;
buffer.append(value.getClass().getName());
}
return buffer.toString();
}
private void feedInterceptors(final List<Interceptor> interceptors, final String interceptorList) {
interceptors.clear();
if (interceptorList == null || interceptorList.trim().equals("")) {
return;
}
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
String[] arrayInterceptor = interceptorList.split(",");
for (String strValue : arrayInterceptor) {
Interceptor interceptor = (Interceptor) ClassloadingUtil.newInstanceFromClassLoader(strValue.trim());
interceptors.add(interceptor);
}
return null;
}
});
}
}

View File

@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
public interface ServerLocatorInternal extends ServerLocator {

View File

@ -276,7 +276,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
long sessionChannelID = connection.generateChannelID();
Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
Packet request = newCreateSessionPacket(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize, sessionChannelID);
try {
// channel1 reference here has to go away
@ -325,10 +325,30 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
inCreateSessionLatch.countDown();
}
} while (retry);
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
}
protected Packet newCreateSessionPacket(Version clientVersion,
String name,
String username,
String password,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
int minLargeMessageSize,
int confirmationWindowSize,
long sessionChannelID) {
return new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
}
protected SessionContext newSessionContext(String name,
int confirmationWindowSize,
Channel sessionChannel,
CreateSessionResponseMessage response) {
// these objects won't be null, otherwise it would keep retrying on the previous loop
return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
}
public boolean cleanupBeforeFailover(ActiveMQException cause) {
@ -398,7 +418,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
return connection;
}
private void sendHandshake(Connection transportConnection) {
protected void sendHandshake(Connection transportConnection) {
if (transportConnection.isUsingProtocolHandling()) {
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
@ -23,13 +24,25 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
private static final long serialVersionUID = 1;
private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory();
private ActiveMQClientProtocolManagerFactory() {
}
public static final ActiveMQClientProtocolManagerFactory getInstance() {
return INSTANCE;
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
}
public static final ActiveMQClientProtocolManagerFactory getInstance(ServerLocator locator) {
ActiveMQClientProtocolManagerFactory factory = new ActiveMQClientProtocolManagerFactory();
factory.setLocator(locator);
return factory;
}
public ClientProtocolManager newProtocolManager() {

View File

@ -114,6 +114,19 @@ public class ActiveMQSessionContext extends SessionContext {
private int confirmationWindow;
private final String name;
protected Channel getSessionChannel() {
return sessionChannel;
}
protected String getName() {
return name;
}
protected int getConfirmationWindow() {
return confirmationWindow;
}
public ActiveMQSessionContext(String name,
RemotingConnection remotingConnection,
Channel sessionChannel,
@ -536,7 +549,7 @@ public class ActiveMQSessionContext extends SessionContext {
final boolean autoCommitAcks,
final boolean preAcknowledge,
final SimpleString defaultAddress) throws ActiveMQException {
Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
boolean retry;
do {
try {
@ -564,6 +577,17 @@ public class ActiveMQSessionContext extends SessionContext {
} while (retry && !session.isClosing());
}
protected CreateSessionMessage newCreateSession(String username,
String password,
int minLargeMessageSize,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
}
@Override
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
@ -724,7 +748,7 @@ public class ActiveMQSessionContext extends SessionContext {
return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId();
}
private ClassLoader lookupTCCL() {
protected ClassLoader lookupTCCL() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
@ -733,7 +757,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
private int calcWindowSize(final int windowSize) {
protected int calcWindowSize(final int windowSize) {
int clientWindowSize;
if (windowSize == -1) {
// No flow control - buffer can increase without bound! Only use with

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public abstract class MessagePacket extends PacketImpl {
public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
protected MessageInternal message;

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.Message;
public interface MessagePacketI {
Message getMessage();
}

View File

@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionReceiveLargeMessage extends PacketImpl {
public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI {
private final MessageInternal message;
@ -58,6 +59,11 @@ public class SessionReceiveLargeMessage extends PacketImpl {
return message;
}
@Override
public Message getMessage() {
return message;
}
public long getConsumerID() {
return consumerID;
}

View File

@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionSendLargeMessage extends PacketImpl {
public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
/**
* Used only if largeMessage
@ -43,6 +44,11 @@ public class SessionSendLargeMessage extends PacketImpl {
return largeMessage;
}
@Override
public Message getMessage() {
return largeMessage;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
largeMessage.encodeHeadersAndProperties(buffer);

View File

@ -16,7 +16,13 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public interface ClientProtocolManagerFactory {
ClientProtocolManager newProtocolManager();
void setLocator(ServerLocator locator);
ServerLocator getLocator();
}

View File

@ -39,6 +39,8 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -50,8 +52,10 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory;
import org.apache.activemq.artemis.jms.referenceable.SerializableObjectRefAddr;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
/**
* <p>ActiveMQ Artemis implementation of a JMS ConnectionFactory.</p>
@ -73,6 +77,8 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
private String password;
private String protocolManagerFactoryStr;
public void writeExternal(ObjectOutput out) throws IOException {
URI uri = toURI();
@ -121,6 +127,27 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
return uri;
}
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
if (protocolManagerFactoryStr != null && !protocolManagerFactoryStr.trim().isEmpty()) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
ClientProtocolManagerFactory protocolManagerFactory =
(ClientProtocolManagerFactory) ClassloadingUtil.newInstanceFromClassLoader(protocolManagerFactoryStr);
serverLocator.setProtocolManagerFactory(protocolManagerFactory);
return null;
}
});
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
String url = in.readUTF();
ConnectionFactoryParser parser = new ConnectionFactoryParser();
@ -606,6 +633,31 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
serverLocator.setInitialMessagePacketSize(size);
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
public void setIncomingInterceptorList(String interceptorList) {
checkWrite();
serverLocator.setIncomingInterceptorList(interceptorList);
}
public String getIncomingInterceptorList() {
return serverLocator.getIncomingInterceptorList();
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
public void setOutgoingInterceptorList(String interceptorList) {
serverLocator.setOutgoingInterceptorList(interceptorList);
}
public String getOutgoingInterceptorList() {
return serverLocator.getOutgoingInterceptorList();
}
public ActiveMQConnectionFactory setUser(String user) {
checkWrite();
this.user = user;

View File

@ -26,6 +26,7 @@ import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -56,6 +57,14 @@ public class ConnectionFactoryURITest {
private static final String IPV6 = "fe80::baf6:b1ff:fe12:daf7%eth0";
private static Set<String> ignoreList = new HashSet<String>();
static {
ignoreList.add("protocolManagerFactoryStr");
ignoreList.add("incomingInterceptorList");
ignoreList.add("outgoingInterceptorList");
}
@Test
public void testIPv6() throws Exception {
Map<String,Object> params = new HashMap<>();
@ -379,6 +388,10 @@ public class ConnectionFactoryURITest {
ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException {
PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory);
for (PropertyDescriptor descriptor : descriptors) {
if (ignoreList.contains(descriptor.getName())) {
continue;
}
System.err.println("name::" + descriptor.getName());
if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null) {
if (descriptor.getPropertyType() == String.class) {
String value = RandomUtil.randomString();

View File

@ -44,6 +44,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
ConnectionFactoryConfiguration setConnectorNames(List<String> connectorNames);
ConnectionFactoryConfiguration setConnectorNames(String...connectorNames);
boolean isHA();
ConnectionFactoryConfiguration setHA(boolean ha);
@ -170,5 +172,9 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType);
ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr);
String getProtocolManagerFactoryStr();
JMSFactoryType getFactoryType();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jms.server.config.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -113,6 +114,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private String groupID = null;
private String protocolManagerFactoryStr;
private JMSFactoryType factoryType = JMSFactoryType.CF;
// Static --------------------------------------------------------
@ -170,6 +173,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this;
}
public ConnectionFactoryConfiguration setConnectorNames(final String...names) {
return this.setConnectorNames(Arrays.asList(names));
}
public boolean isHA() {
return ha;
}
@ -534,6 +542,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
factoryType = JMSFactoryType.valueOf(buffer.readInt());
protocolManagerFactoryStr = BufferHelper.readNullableSimpleStringAsString(buffer);
}
@Override
@ -618,6 +628,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
buffer.writeInt(factoryType.intValue());
BufferHelper.writeAsNullableSimpleString(buffer, protocolManagerFactoryStr);
}
@Override
@ -724,7 +736,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.sizeOfNullableSimpleString(groupID) +
DataConstants.SIZE_INT; // factoryType
DataConstants.SIZE_INT +
// factoryType
BufferHelper.sizeOfNullableSimpleString(protocolManagerFactoryStr);
return size;
}
@ -749,6 +764,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this.compressLargeMessage;
}
@Override
public ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
return this;
}
@Override
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
// Public --------------------------------------------------------
// Package protected ---------------------------------------------

View File

@ -955,7 +955,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
public void runException() throws Exception {
checkBindings(bindings);
ActiveMQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
ActiveMQConnectionFactory cf = internalCreateCF(cfConfig);
ArrayList<String> bindingsToAdd = new ArrayList<String>();
@ -1075,8 +1075,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* @param cfConfig
* @throws Exception
*/
private ActiveMQConnectionFactory internalCreateCF(final boolean persisted,
final ConnectionFactoryConfiguration cfConfig) throws Exception {
private ActiveMQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws Exception {
checkInitialised();
ActiveMQConnectionFactory cf = connectionFactories.get(cfConfig.getName());
@ -1168,6 +1167,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
cf.setGroupID(cfConfig.getGroupID());
cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr());
return cf;
}
@ -1445,7 +1445,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
for (PersistedConnectionFactory cf : cfs) {
internalCreateCF(true, cf.getConfig());
internalCreateCF(cf.getConfig());
}
List<PersistedDestination> destinations = storage.recoverDestinations();

View File

@ -85,6 +85,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
// no op
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);

View File

@ -19,76 +19,36 @@ package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
public class HQPropertiesConversionInterceptor implements Interceptor {
private static Map<SimpleString, SimpleString> dictionary;
static {
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
private final boolean replaceHQ;
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
// Add entries for incoming messages
d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
dictionary = Collections.unmodifiableMap(d);
public HQPropertiesConversionInterceptor(final boolean replaceHQ) {
this.replaceHQ = replaceHQ;
}
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (isMessagePacket(packet)) {
handleReceiveMessage((MessagePacket) packet);
if (HQPropertiesConverter.isMessagePacket(packet)) {
handleReceiveMessage((MessagePacketI) packet);
}
return true;
}
private void handleReceiveMessage(MessagePacket messagePacket) {
Message message = messagePacket.getMessage();
// We are modifying the key set so we iterate over a shallow copy.
for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
if (dictionary.containsKey(property)) {
message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
}
private void handleReceiveMessage(MessagePacketI messagePacket) {
if (replaceHQ) {
HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage());
}
else {
HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage());
}
}
private boolean isMessagePacket(Packet packet) {
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_CONTINUATION ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* HornetQ Protocol Manager
*/
@ -53,6 +53,12 @@ class HornetQProtocolManager extends CoreProtocolManager {
}
}
@Override
public boolean acceptsNoHandshake() {
return true;
}
@Override
public boolean isProtocol(byte[] array) {
String frameStart = new String(array, StandardCharsets.US_ASCII);

View File

@ -34,9 +34,8 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors) {
Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
incomingInterceptors.add(propertyConversionInterceptor);
outgoingInterceptors.add(propertyConversionInterceptor);
incomingInterceptors.add(new HQPropertiesConversionInterceptor(true));
outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false));
return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager {
private static final int VERSION_PLAYED = 123;
protected void sendHandshake(Connection transportConnection) {
}
protected SessionContext newSessionContext(String name,
int confirmationWindowSize,
Channel sessionChannel,
CreateSessionResponseMessage response) {
// these objects won't be null, otherwise it would keep retrying on the previous loop
return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
}
@Override
protected Packet newCreateSessionPacket(Version clientVersion,
String name,
String username,
String password,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
int minLargeMessageSize,
int confirmationWindowSize,
long sessionChannelID) {
return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
}
@Override
public void sendSubscribeTopology(final boolean isServer) {
getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED));
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory {
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true));
locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
}
@Override
public ClientProtocolManager newProtocolManager() {
return new HornetQClientProtocolManager();
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
public class HornetQClientSessionContext extends ActiveMQSessionContext {
public HornetQClientSessionContext(String name,
RemotingConnection remotingConnection,
Channel sessionChannel,
int serverVersion,
int confirmationWindow) {
super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow);
}
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
return response.toQueueQuery();
}
protected CreateSessionMessage newCreateSession(String username,
String password,
int minLargeMessageSize,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
}
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false);
}
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
int windowSize,
int maxRate,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
// The value we send is just a hint
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.util;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class HQPropertiesConverter {
private static Map<SimpleString, SimpleString> hqAmqDictionary;
private static Map<SimpleString, SimpleString> amqHqDictionary;
static {
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
hqAmqDictionary = Collections.unmodifiableMap(d);
d = new HashMap<>();
// inverting the direction
for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) {
d.put(entry.getValue(), entry.getKey());
}
amqHqDictionary = Collections.unmodifiableMap(d);
}
public static void replaceAMQProperties(final Message message) {
replaceDict(message, amqHqDictionary);
}
public static void replaceHQProperties(final Message message) {
replaceDict(message, hqAmqDictionary);
}
private static void replaceDict(final Message message, Map<SimpleString, SimpleString> dictionary) {
for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
SimpleString replaceTo = dictionary.get(property);
if (replaceTo != null) {
message.putObjectProperty(replaceTo, message.removeProperty(property));
}
}
}
public static boolean isMessagePacket(Packet packet) {
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
@ -32,8 +34,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.List;
/**
* MQTTProtocolManager
*/
@ -79,6 +79,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
}
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public void removeHandler(String name) {
// TODO add support for handlers

View File

@ -150,6 +150,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
public ProtocolManagerFactory<Interceptor> getFactory() {
return factory;
}

View File

@ -104,6 +104,11 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
this.outgoingInterceptors = outgoingInterceptors;
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public ProtocolManagerFactory<StompFrameInterceptor> getFactory() {
return factory;

View File

@ -879,6 +879,22 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties.setProducerWindowSize(producerWindowSize);
}
public String getProtocolManagerFactoryStr() {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("getProtocolManagerFactoryStr()");
}
return raProperties.getProtocolManagerFactoryStr();
}
public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("setProtocolManagerFactoryStr(" + protocolManagerFactoryStr + ")");
}
raProperties.setProtocolManagerFactoryStr(protocolManagerFactoryStr);
}
/**
* Get min large message size
*
@ -1971,6 +1987,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
if (val5 != null) {
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
val5 = overrideProperties.getProtocolManagerFactoryStr() != null ? overrideProperties.getProtocolManagerFactoryStr() : raProperties.getProtocolManagerFactoryStr();
if (val5 != null) {
cf.setProtocolManagerFactoryStr(val5);
}
}
public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activeMQRAManagedConnectionFactory) {

View File

@ -118,6 +118,8 @@ public class ConnectionFactoryProperties {
private String groupID;
private String protocolManagerFactoryStr;
/**
* @return the transportType
*/
@ -679,6 +681,14 @@ public class ConnectionFactoryProperties {
hasBeenUpdated = true;
}
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
}
public boolean isHasBeenUpdated() {
return hasBeenUpdated;
}
@ -890,6 +900,12 @@ public class ConnectionFactoryProperties {
}
else if (!this.producerWindowSize.equals(other.producerWindowSize))
return false;
else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr))
return false;
if (this.protocolManagerFactoryStr == null) {
if (other.protocolManagerFactoryStr != null)
return false;
}
if (this.reconnectAttempts == null) {
if (other.reconnectAttempts != null)
return false;
@ -971,6 +987,7 @@ public class ConnectionFactoryProperties {
result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode());
result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode());
result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode());
result = prime * result + ((protocolManagerFactoryStr == null) ? 0 : protocolManagerFactoryStr.hashCode());
result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode());
result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode());
result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode());

View File

@ -257,6 +257,15 @@ public interface Configuration {
Configuration addAcceptorConfiguration(final TransportConfiguration infos);
/**
* Add an acceptor to the config
* @param name the name of the acceptor
* @param uri the URI of the acceptor
* @return this
* @throws Exception in case of Parsing errors on the URI
*/
Configuration addAcceptorConfiguration(String name, String uri) throws Exception;
Configuration clearAcceptorConfigurations();
/**
@ -271,6 +280,8 @@ public interface Configuration {
Configuration addConnectorConfiguration(final String key, final TransportConfiguration info);
Configuration addConnectorConfiguration(final String name, final String uri) throws Exception;
Configuration clearConnectorConfigurations();
/**

View File

@ -51,6 +51,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
public class ConfigurationImpl implements Configuration, Serializable {
@ -337,6 +339,19 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception {
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
for (TransportConfiguration config : configurations) {
addAcceptorConfiguration(config);
}
return this;
}
public ConfigurationImpl clearAcceptorConfigurations() {
acceptorConfigs.clear();
return this;
@ -356,6 +371,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
for (TransportConfiguration config : configurations) {
addConnectorConfiguration(name, config);
}
return this;
}
public ConfigurationImpl clearConnectorConfigurations() {
connectorConfigs.clear();
return this;

View File

@ -154,7 +154,15 @@ public class ProtocolHandler {
//if we get here we assume we use the core protocol as we match nothing else
if (protocolToUse == null) {
protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
for (Map.Entry<String, ProtocolManager> entry : protocolMap.entrySet()) {
if (entry.getValue().acceptsNoHandshake()) {
protocolToUse = entry.getKey();
break;
}
}
if (protocolToUse == null) {
protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
}
}
ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator();

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -25,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
@ -98,6 +98,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
/**
* no need to implement this now
*
@ -162,23 +167,25 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
@Override
public boolean isProtocol(byte[] array) {
String frameStart = new String(array, StandardCharsets.US_ASCII);
return frameStart.startsWith("ACTIVEMQ");
return isArtemis(ActiveMQBuffers.wrappedBuffer(array));
}
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
//if we are not an old client then handshake
if (buffer.getByte(0) == 'A' &&
if (isArtemis(buffer)) {
buffer.readBytes(7);
}
}
private boolean isArtemis(ActiveMQBuffer buffer) {
return buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' &&
buffer.getByte(2) == 'T' &&
buffer.getByte(3) == 'E' &&
buffer.getByte(4) == 'M' &&
buffer.getByte(5) == 'I' &&
buffer.getByte(6) == 'S') {
//todo add some handshaking
buffer.readBytes(7);
}
buffer.getByte(6) == 'S';
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@ -27,10 +28,23 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactor
*/
public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolManagerFactory {
private static final ActiveMQServerSideProtocolManagerFactory INSTANCE = new ActiveMQServerSideProtocolManagerFactory();
public static ActiveMQServerSideProtocolManagerFactory getInstance() {
return INSTANCE;
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
}
public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) {
ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory();
instance.setLocator(locator);
return instance;
}
private ActiveMQServerSideProtocolManagerFactory() {

View File

@ -207,7 +207,7 @@ public class BackupManager implements ActiveMQComponent {
backupServerLocator.setIdentity("backupLocatorFor='" + server + "'");
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator));
}
}
@ -332,7 +332,7 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
return locator;
}
return null;

View File

@ -182,7 +182,7 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
locators.put(name, serverLocator);
}
@ -237,7 +237,7 @@ public class ClusterController implements ActiveMQComponent {
* @return the Cluster Control
*/
public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) {
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator()));
return new ClusterControl(sf, server);
}

View File

@ -129,7 +129,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
@Override
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
setSessionFactory(factory);

View File

@ -595,7 +595,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.start(server.getExecutorFactory().getExecutor());
}
@ -760,7 +760,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
targetLocator.setNodeID(nodeId);

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -31,15 +34,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import java.util.List;
import java.util.Map;
/*
* Instead of loading into its own post office this will use its parent server (the actual live server) and load into that.
* Since the server is already running we have to make sure we don't route any message that may subsequently get deleted or acked.
@ -88,7 +88,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());

View File

@ -115,7 +115,7 @@ public class LiveOnlyActivation extends Activation {
try {
scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
//use a Node Locator to connect to the cluster
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator));
LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer);
scaleDownServerLocator.addClusterTopologyListener(nodeLocator);

View File

@ -55,5 +55,10 @@ public interface ProtocolManager<P extends BaseInterceptor> {
*/
MessageConverter getConverter();
/** If this protocols accepts connectoins without an initial handshake.
* If true this protocol will be the failback case no other conenctions are made.
* New designed protocols should always require a handshake. This is only useful for legacy protocols. */
boolean acceptsNoHandshake();
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
}

View File

@ -24,11 +24,15 @@ import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
@ -206,6 +210,18 @@ public abstract class ActiveMQTestBase extends Assert {
temporaryFolder = new TemporaryFolder(parent);
}
protected <T> T serialClone(Object object) throws Exception {
System.out.println("object::" + object);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream obOut = new ObjectOutputStream(bout);
obOut.writeObject(object);
ByteArrayInputStream binput = new ByteArrayInputStream(bout.toByteArray());
ObjectInputStream obinp = new ObjectInputStream(binput);
return (T) obinp.readObject();
}
@After
public void tearDown() throws Exception {
for (ExecutorService s : executorSet) {

View File

@ -75,7 +75,7 @@ public class ActiveMQXAResourceRecovery {
String username = parser.getUsername();
String password = parser.getPassword();
TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null);
xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null, null);
}
res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
/**
* This represents the configuration of a single connection factory.
@ -42,13 +43,14 @@ public class XARecoveryConfig {
private final String username;
private final String password;
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;
public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map<String, String> properties) {
if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties);
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}
else {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties);
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}
}
@ -57,13 +59,44 @@ public class XARecoveryConfig {
final TransportConfiguration[] transportConfiguration,
final String username,
final String password,
final Map<String, String> properties) {
this.transportConfiguration = transportConfiguration;
final Map<String, String> properties,
final ClientProtocolManagerFactory clientProtocolManager) {
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
this.transportConfiguration = newTransportConfiguration;
this.discoveryConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;
}
public XARecoveryConfig(final boolean ha,
final TransportConfiguration[] transportConfiguration,
final String username,
final String password,
final Map<String, String> properties) {
this(ha, transportConfiguration, username, password, properties, null);
}
public XARecoveryConfig(final boolean ha,
final DiscoveryGroupConfiguration discoveryConfiguration,
final String username,
final String password,
final Map<String, String> properties,
final ClientProtocolManagerFactory clientProtocolManager) {
this.discoveryConfiguration = discoveryConfiguration;
this.transportConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.clientProtocolManager = clientProtocolManager;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
}
public XARecoveryConfig(final boolean ha,
@ -71,12 +104,7 @@ public class XARecoveryConfig {
final String username,
final String password,
final Map<String, String> properties) {
this.discoveryConfiguration = discoveryConfiguration;
this.transportConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this(ha, discoveryConfiguration, username, password, properties, null);
}
public boolean isHA() {
@ -103,6 +131,10 @@ public class XARecoveryConfig {
return properties;
}
public ClientProtocolManagerFactory getClientProtocolManager() {
return clientProtocolManager;
}
/**
* Create a serverLocator using the configuration
*
@ -110,10 +142,10 @@ public class XARecoveryConfig {
*/
public ServerLocator createServerLocator() {
if (getDiscoveryConfiguration() != null) {
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
}
else {
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.service.extensions.tests.recovery;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.junit.Assert;
import org.junit.Test;
public class XARecoveryConfigTest {
@Test
public void testEquals() throws Exception {
String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
null, null, null);
TransportConfiguration transportConfig2 = new TransportConfiguration(factClass, null);
XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
null, null, null);
// They are using Different names
Assert.assertNotEquals(transportConfig, transportConfig2);
Assert.assertEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
// The equals here shouldn't take the name into consideration
Assert.assertEquals(config, config2);
}
@Test
public void testNotEquals() throws Exception {
String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
null, null, null);
TransportConfiguration transportConfig2 = new TransportConfiguration(factClass + "2", null);
XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
null, null, null);
// They are using Different names
Assert.assertNotEquals(transportConfig, transportConfig2);
Assert.assertNotEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
// The equals here shouldn't take the name into consideration
Assert.assertNotEquals(config, config2);
}
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HornetQProtocolManagerTest extends ActiveMQTestBase {
ActiveMQServer server;
EmbeddedJMS embeddedJMS;
@Before
public void setUp() throws Exception {
super.setUp();
Configuration configuration = createDefaultConfig(false);
configuration.setPersistenceEnabled(false);
configuration.getAcceptorConfigurations().clear();
configuration.addAcceptorConfiguration("legacy", "tcp://localhost:61616?protocols=HORNETQ").
addAcceptorConfiguration("corepr", "tcp://localhost:61617?protocols=CORE");
configuration.addConnectorConfiguration("legacy", "tcp://localhost:61616");
JMSConfiguration jmsConfiguration = new JMSConfigurationImpl();
jmsConfiguration.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName("testQueue").setBindings("testQueue"));
embeddedJMS = new EmbeddedJMS();
embeddedJMS.setConfiguration(configuration);
embeddedJMS.setJmsConfiguration(jmsConfiguration);
embeddedJMS.start();
}
public void tearDown() throws Exception {
embeddedJMS.stop();
super.tearDown();
}
@Test
public void testLegacy() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=" + HornetQClientProtocolManagerFactory.class.getName());
connectionFactory.createConnection().close();
ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("tcp://localhost:61617");
connectionFactory2.createConnection().close();
RecoveryManager manager = new RecoveryManager();
manager.register(connectionFactory, null, null, new ConcurrentHashMap<String, String>());
manager.register(connectionFactory2, null, null, new ConcurrentHashMap<String, String>());
for (XARecoveryConfig resource :manager.getResources()) {
ServerLocator locator = resource.createServerLocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
factory.close();
locator.close();
}
}
/** This test will use an ArtemisConnectionFactory with clientProtocolManager=*/
@Test
public void testLegacy2() throws Exception {
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl();
configuration.setConnectorNames("legacy");
configuration.setName("legacy");
configuration.setProtocolManagerFactoryStr(HornetQClientProtocolManagerFactory.class.getName());
embeddedJMS.getJMSServerManager().createConnectionFactory(false, configuration, "legacy");
Queue queue = (Queue) embeddedJMS.lookup("testQueue");
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) embeddedJMS.lookup("legacy");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Test");
for (int i = 0; i < 5; i++) {
message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(), "duplicate");
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
TextMessage messageRec = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(messageRec);
Assert.assertEquals("Test", messageRec.getText());
Assert.assertNull(consumer.receiveNoWait());
connection.close();
connectionFactory.close();
}
}

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -29,15 +33,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.hornetq.api.core.client.HornetQClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class HornetQProtocolTest extends ActiveMQTestBase {
protected ActiveMQServer server;
@ -59,6 +61,13 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
waitForServerToStart(server);
}
@After
public void tearDown() throws Exception {
org.hornetq.core.client.impl.ServerLocatorImpl.clearThreadPools();
super.tearDown();
}
@Test
public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception {
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
@ -83,6 +92,7 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
}
ClientMessage coreMessage1 = coreConsumer.receive(1000);
System.err.println("Messages::==" + coreMessage1);
assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
coreSession.close();
@ -93,6 +103,39 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
hqSession.close();
}
@Test
public void testLargeMessagesOverHornetQClients() throws Exception {
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
// Create Queue
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
// HornetQ Client Objects
hqSession.start();
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
for (int i = 0; i < 2; i++) {
org.hornetq.api.core.client.ClientMessage hqMessage = hqSession.createMessage(true);
hqMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
hqProducer.send(hqMessage);
}
hqSession.commit();
for (int i = 0; i < 2; i++) {
org.hornetq.api.core.client.ClientMessage coreMessage1 = hqConsumer.receive(1000);
coreMessage1.acknowledge();
System.err.println("Messages::==" + coreMessage1);
for (int j = 0; j < 10 * 1024; j++) {
Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j), coreMessage1.getBodyBuffer().readByte());
}
}
hqSession.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception {
org.hornetq.api.core.client.ClientSession session = createHQClientSession();
@ -158,14 +201,14 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
}
private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session) {
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
org.hornetq.api.core.client.ClientMessage message = session.createMessage(true);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private ClientMessage createCoreTestMessage(ClientSession session) {
ClientMessage message = session.createMessage(false);
ClientMessage message = session.createMessage(true);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;

View File

@ -55,7 +55,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentConnector() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
clusterControl.authorize();
@ -65,7 +65,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentPassword() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
try {

View File

@ -1329,7 +1329,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
}
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
addServerLocator(locators[node]);

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.interceptors;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class Incoming implements Interceptor {
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
System.out.println("Incoming:Packet : " + packet);
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
SessionReceiveMessage p = (SessionReceiveMessage) packet;
p.getMessage().putStringProperty("Incoming", "was here");
}
return true;
}
}

View File

@ -14,7 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration;
package org.apache.activemq.artemis.tests.integration.interceptors;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -40,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -1027,4 +1038,52 @@ public class InterceptorTest extends ActiveMQTestBase {
session.close();
}
@Test
public void testInterceptorOnURI() throws Exception {
locator.close();
String uri = "tcp://localhost:61616?incomingInterceptorList=" + Incoming.class.getCanonicalName() + "&outgoingInterceptorList=" + Outgoing.class.getName();
System.out.println(uri);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
// Serialize stuff to make sure the interceptors are on the URI
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.close();
objectOutputStream.writeObject(factory);
ByteArrayInputStream input = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
ObjectInputStream objectInputStream = new ObjectInputStream(input);
factory = (ActiveMQConnectionFactory) objectInputStream.readObject();
Connection connection = factory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(QUEUE.toString()));
producer.send(session.createTextMessage("HelloMessage"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE.toString()));
TextMessage msg = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(consumer);
Assert.assertEquals("HelloMessage", msg.getText());
Assert.assertEquals("was here", msg.getStringProperty("Incoming"));
Assert.assertEquals("sending", msg.getStringProperty("Outgoing"));
connection.close();
factory.close();
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.interceptors;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class Outgoing implements Interceptor {
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
System.out.println("Outgoin:Packet : " + packet);
if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage p = (SessionSendMessage) packet;
p.getMessage().putStringProperty("Outgoing", "sending");
}
return true;
}
}

View File

@ -44,19 +44,32 @@ public class ConnectionTest extends JMSTestBase {
@Test
public void testThroughNewConnectionFactory() throws Exception {
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("vm://0"));
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#"));
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
testThroughNewConnectionFactory(connectionFactory);
// Run it again with a cloned through serialization CF, simulating JNDI lookups
connectionFactory = serialClone(connectionFactory);
testThroughNewConnectionFactory(connectionFactory);
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#");
testThroughNewConnectionFactory(connectionFactory);
// Run it again with a cloned through serialization CF, simulating JNDI lookups
connectionFactory = serialClone(connectionFactory);
testThroughNewConnectionFactory(connectionFactory);
}
private void testThroughNewConnectionFactory(ActiveMQConnectionFactory factory) throws Exception {

View File

@ -384,6 +384,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
" <config-property-name>JgroupsChannelRefName</config-property-name>" +
" <config-property-type>java.lang.String</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>" +
" <config-property>" +
" <description>ProtocolManagerConfig</description>" +
" <config-property-name>ProtocolManagerFactoryStr</config-property-name>" +
" <config-property-type>java.lang.String</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>";
private static String rootConfig = "<root>" + config + commentedOutConfigs + "</root>";

View File

@ -41,6 +41,8 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase {
static {
UNSUPPORTED_CF_PROPERTIES = new TreeSet<String>();
UNSUPPORTED_CF_PROPERTIES.add("discoveryGroupName");
UNSUPPORTED_CF_PROPERTIES.add("incomingInterceptorList");
UNSUPPORTED_CF_PROPERTIES.add("outgoingInterceptorList");
UNSUPPORTED_RA_PROPERTIES = new TreeSet<String>();
UNSUPPORTED_RA_PROPERTIES.add("HA");