diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index bdb49bb30e..666c5c8369 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -174,10 +174,19 @@ public class TransportConfiguration implements Serializable { TransportConfiguration that = (TransportConfiguration) o; - if (!factoryClassName.equals(that.factoryClassName)) + if (!isSameHost(that)) { return false; + } + if (name != null ? !name.equals(that.name) : that.name != null) return false; + + return true; + } + + public boolean isSameHost(TransportConfiguration that) { + if (!factoryClassName.equals(that.factoryClassName)) + return false; if (params != null ? !params.equals(that.params) : that.params != null) return false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 8f086c6bb1..b1988783c5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -769,6 +769,20 @@ public interface ServerLocator extends AutoCloseable { ClientProtocolManagerFactory getProtocolManagerFactory(); - void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager); + ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager); + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + ServerLocator setIncomingInterceptorList(String interceptorList); + + String getIncomingInterceptorList(); + + ServerLocator setOutgoingInterceptorList(String interceptorList); + + String getOutgoingInterceptorList(); + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index acdf1e8e35..ed588ef364 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -1269,7 +1269,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C Pair connectorPair, boolean isLast) { // if it is our connector then set the live id used for failover - if (connectorPair.getA() != null && connectorPair.getA().equals(connectorConfig)) { + if (connectorPair.getA() != null && connectorPair.getA().isSameHost(connectorConfig)) { liveNodeID = nodeID; } serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c979246f04..3f1eead272 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -80,7 +80,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private static final long serialVersionUID = -1615857864410205260L; // This is the default value - private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); + private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this); private final boolean ha; @@ -201,12 +201,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration clusterTransportConfiguration; - /* - * *************WARNING*************** - * remember that when adding any new classes that we have to support serialization with previous clients. - * If you need to, make them transient and handle the serialization yourself - * */ - private final Exception traceException = new Exception(); // To be called when there are ServerLocator being finalized. @@ -619,14 +613,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientProtocolManagerFactory getProtocolManagerFactory() { if (protocolManagerFactory == null) { - // this could happen over serialization from older versions - protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); + // Default one in case it's null + protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this); } return protocolManagerFactory; } - public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) { + public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) { this.protocolManagerFactory = protocolManagerFactory; + protocolManagerFactory.setLocator(this); + return this; } public void disableFinalizeCheck() { @@ -860,10 +856,41 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } + @Override public boolean isHA() { return ha; } + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + @Override + public ServerLocator setIncomingInterceptorList(String interceptorList) { + feedInterceptors(incomingInterceptors, interceptorList); + return this; + } + + @Override + public String getIncomingInterceptorList() { + return fromInterceptors(incomingInterceptors); + } + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + @Override + public ServerLocator setOutgoingInterceptorList(String interceptorList) { + feedInterceptors(outgoingInterceptors, interceptorList); + return this; + } + + @Override + public String getOutgoingInterceptorList() { + return fromInterceptors(outgoingInterceptors); + } + public boolean isCacheLargeMessagesClient() { return cacheLargeMessagesClient; } @@ -1775,4 +1802,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public boolean isReceivedToplogy() { return receivedTopology; } + + private String fromInterceptors(final List interceptors) { + StringBuffer buffer = new StringBuffer(); + boolean first = true; + for (Interceptor value : interceptors) { + if (!first) { + buffer.append(","); + } + first = false; + buffer.append(value.getClass().getName()); + } + + return buffer.toString(); + } + + private void feedInterceptors(final List interceptors, final String interceptorList) { + interceptors.clear(); + + if (interceptorList == null || interceptorList.trim().equals("")) { + return; + } + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + + String[] arrayInterceptor = interceptorList.split(","); + for (String strValue : arrayInterceptor) { + Interceptor interceptor = (Interceptor) ClassloadingUtil.newInstanceFromClassLoader(strValue.trim()); + interceptors.add(interceptor); + } + return null; + } + }); + + } + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java index 42da78964f..0f945aa2bb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java @@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.client.impl; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; public interface ServerLocatorInternal extends ServerLocator { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index f220457bff..7c11cd0a52 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -107,7 +107,7 @@ public final class TopologyMemberImpl implements TopologyMember { } public boolean isMember(TransportConfiguration configuration) { - if (getConnector().getA() != null && getConnector().getA().equals(configuration) || getConnector().getB() != null && getConnector().getB().equals(configuration)) { + if (getConnector().getA() != null && getConnector().getA().isSameHost(configuration) || getConnector().getB() != null && getConnector().getB().isSameHost(configuration)) { return true; } else { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index 73ea52925b..b45fd5a133 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -276,7 +276,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { long sessionChannelID = connection.generateChannelID(); - Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + Packet request = newCreateSessionPacket(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize, sessionChannelID); try { // channel1 reference here has to go away @@ -325,10 +325,30 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { inCreateSessionLatch.countDown(); } } while (retry); + return newSessionContext(name, confirmationWindowSize, sessionChannel, response); + } + + protected Packet newCreateSessionPacket(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize, + long sessionChannelID) { + return new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + } + + protected SessionContext newSessionContext(String name, + int confirmationWindowSize, + Channel sessionChannel, + CreateSessionResponseMessage response) { // these objects won't be null, otherwise it would keep retrying on the previous loop return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); - } public boolean cleanupBeforeFailover(ActiveMQException cause) { @@ -398,7 +418,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { return connection; } - private void sendHandshake(Connection transportConnection) { + protected void sendHandshake(Connection transportConnection) { if (transportConnection.isUsingProtocolHandling()) { // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java index a58834b7cc..24727c9d09 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; @@ -23,13 +24,25 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag private static final long serialVersionUID = 1; - private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory(); - private ActiveMQClientProtocolManagerFactory() { } - public static final ActiveMQClientProtocolManagerFactory getInstance() { - return INSTANCE; + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + } + + public static final ActiveMQClientProtocolManagerFactory getInstance(ServerLocator locator) { + ActiveMQClientProtocolManagerFactory factory = new ActiveMQClientProtocolManagerFactory(); + factory.setLocator(locator); + return factory; } public ClientProtocolManager newProtocolManager() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 5279de2a79..d8dc125dc2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -114,6 +114,19 @@ public class ActiveMQSessionContext extends SessionContext { private int confirmationWindow; private final String name; + protected Channel getSessionChannel() { + return sessionChannel; + } + + protected String getName() { + return name; + } + + protected int getConfirmationWindow() { + return confirmationWindow; + + } + public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, @@ -536,7 +549,7 @@ public class ActiveMQSessionContext extends SessionContext { final boolean autoCommitAcks, final boolean preAcknowledge, final SimpleString defaultAddress) throws ActiveMQException { - Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); boolean retry; do { try { @@ -564,6 +577,17 @@ public class ActiveMQSessionContext extends SessionContext { } while (retry && !session.isClosing()); } + protected CreateSessionMessage newCreateSession(String username, + String password, + int minLargeMessageSize, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + SimpleString defaultAddress) { + return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + } + @Override public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException { ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); @@ -724,7 +748,7 @@ public class ActiveMQSessionContext extends SessionContext { return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId(); } - private ClassLoader lookupTCCL() { + protected ClassLoader lookupTCCL() { return AccessController.doPrivileged(new PrivilegedAction() { public ClassLoader run() { return Thread.currentThread().getContextClassLoader(); @@ -733,7 +757,7 @@ public class ActiveMQSessionContext extends SessionContext { } - private int calcWindowSize(final int windowSize) { + protected int calcWindowSize(final int windowSize) { int clientWindowSize; if (windowSize == -1) { // No flow control - buffer can increase without bound! Only use with diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index abcb233f44..4ed86ba754 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public abstract class MessagePacket extends PacketImpl { +public abstract class MessagePacket extends PacketImpl implements MessagePacketI { protected MessageInternal message; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java new file mode 100644 index 0000000000..ea1146f1a0 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacketI.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.Message; + +public interface MessagePacketI { + Message getMessage(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java index 460cd237ba..8b32256dcd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionReceiveLargeMessage extends PacketImpl { +public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI { private final MessageInternal message; @@ -58,6 +59,11 @@ public class SessionReceiveLargeMessage extends PacketImpl { return message; } + @Override + public Message getMessage() { + return message; + } + public long getConsumerID() { return consumerID; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java index 1bf9bbb026..3c7dbe79fc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendLargeMessage.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionSendLargeMessage extends PacketImpl { +public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI { /** * Used only if largeMessage @@ -43,6 +44,11 @@ public class SessionSendLargeMessage extends PacketImpl { return largeMessage; } + @Override + public Message getMessage() { + return largeMessage; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { largeMessage.encodeHeadersAndProperties(buffer); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java index c9c78a552c..7e822384a6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManagerFactory.java @@ -16,7 +16,13 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import org.apache.activemq.artemis.api.core.client.ServerLocator; + public interface ClientProtocolManagerFactory { ClientProtocolManager newProtocolManager(); + + void setLocator(ServerLocator locator); + + ServerLocator getLocator(); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index 912554eb56..aa29bc5b73 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -39,6 +39,8 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.net.URI; +import java.security.AccessController; +import java.security.PrivilegedAction; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -50,8 +52,10 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory; import org.apache.activemq.artemis.jms.referenceable.SerializableObjectRefAddr; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.uri.ConnectionFactoryParser; import org.apache.activemq.artemis.uri.ServerLocatorParser; +import org.apache.activemq.artemis.utils.ClassloadingUtil; /** *

ActiveMQ Artemis implementation of a JMS ConnectionFactory.

@@ -73,6 +77,8 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, private String password; + private String protocolManagerFactoryStr; + public void writeExternal(ObjectOutput out) throws IOException { URI uri = toURI(); @@ -121,6 +127,27 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, return uri; } + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + + public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) { + + if (protocolManagerFactoryStr != null && !protocolManagerFactoryStr.trim().isEmpty()) { + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + + ClientProtocolManagerFactory protocolManagerFactory = + (ClientProtocolManagerFactory) ClassloadingUtil.newInstanceFromClassLoader(protocolManagerFactoryStr); + serverLocator.setProtocolManagerFactory(protocolManagerFactory); + return null; + } + }); + + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + } + } + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { String url = in.readUTF(); ConnectionFactoryParser parser = new ConnectionFactoryParser(); @@ -606,6 +633,31 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable, serverLocator.setInitialMessagePacketSize(size); } + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + public void setIncomingInterceptorList(String interceptorList) { + checkWrite(); + serverLocator.setIncomingInterceptorList(interceptorList); + } + + public String getIncomingInterceptorList() { + return serverLocator.getIncomingInterceptorList(); + } + + /** + * @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method. + * @return this + */ + public void setOutgoingInterceptorList(String interceptorList) { + serverLocator.setOutgoingInterceptorList(interceptorList); + } + + public String getOutgoingInterceptorList() { + return serverLocator.getOutgoingInterceptorList(); + } + public ActiveMQConnectionFactory setUser(String user) { checkWrite(); this.user = user; diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java index d374756522..486320810e 100644 --- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java +++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java @@ -26,6 +26,7 @@ import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -56,6 +57,14 @@ public class ConnectionFactoryURITest { private static final String IPV6 = "fe80::baf6:b1ff:fe12:daf7%eth0"; + private static Set ignoreList = new HashSet(); + + static { + ignoreList.add("protocolManagerFactoryStr"); + ignoreList.add("incomingInterceptorList"); + ignoreList.add("outgoingInterceptorList"); + } + @Test public void testIPv6() throws Exception { Map params = new HashMap<>(); @@ -379,6 +388,10 @@ public class ConnectionFactoryURITest { ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException { PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory); for (PropertyDescriptor descriptor : descriptors) { + if (ignoreList.contains(descriptor.getName())) { + continue; + } + System.err.println("name::" + descriptor.getName()); if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null) { if (descriptor.getPropertyType() == String.class) { String value = RandomUtil.randomString(); diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java index 57a955f91e..ab4990c4da 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java @@ -44,6 +44,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport { ConnectionFactoryConfiguration setConnectorNames(List connectorNames); + ConnectionFactoryConfiguration setConnectorNames(String...connectorNames); + boolean isHA(); ConnectionFactoryConfiguration setHA(boolean ha); @@ -170,5 +172,9 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport { ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType); + ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr); + + String getProtocolManagerFactoryStr(); + JMSFactoryType getFactoryType(); } diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java index 43c53853f0..b5efcd7690 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jms.server.config.impl; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -113,6 +114,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf private String groupID = null; + private String protocolManagerFactoryStr; + private JMSFactoryType factoryType = JMSFactoryType.CF; // Static -------------------------------------------------------- @@ -170,6 +173,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this; } + + public ConnectionFactoryConfiguration setConnectorNames(final String...names) { + return this.setConnectorNames(Arrays.asList(names)); + } + public boolean isHA() { return ha; } @@ -534,6 +542,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf groupID = BufferHelper.readNullableSimpleStringAsString(buffer); factoryType = JMSFactoryType.valueOf(buffer.readInt()); + + protocolManagerFactoryStr = BufferHelper.readNullableSimpleStringAsString(buffer); } @Override @@ -618,6 +628,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.writeAsNullableSimpleString(buffer, groupID); buffer.writeInt(factoryType.intValue()); + + BufferHelper.writeAsNullableSimpleString(buffer, protocolManagerFactoryStr); } @Override @@ -724,7 +736,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.sizeOfNullableSimpleString(groupID) + - DataConstants.SIZE_INT; // factoryType + DataConstants.SIZE_INT + + // factoryType + + BufferHelper.sizeOfNullableSimpleString(protocolManagerFactoryStr); return size; } @@ -749,6 +764,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this.compressLargeMessage; } + @Override + public ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr) { + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + return this; + } + + @Override + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index ff246f2d64..99e0daa730 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -955,7 +955,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback public void runException() throws Exception { checkBindings(bindings); - ActiveMQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig); + ActiveMQConnectionFactory cf = internalCreateCF(cfConfig); ArrayList bindingsToAdd = new ArrayList(); @@ -1075,8 +1075,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback * @param cfConfig * @throws Exception */ - private ActiveMQConnectionFactory internalCreateCF(final boolean persisted, - final ConnectionFactoryConfiguration cfConfig) throws Exception { + private ActiveMQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws Exception { checkInitialised(); ActiveMQConnectionFactory cf = connectionFactories.get(cfConfig.getName()); @@ -1168,6 +1167,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection()); cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages()); cf.setGroupID(cfConfig.getGroupID()); + cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr()); return cf; } @@ -1445,7 +1445,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback List cfs = storage.recoverConnectionFactories(); for (PersistedConnectionFactory cf : cfs) { - internalCreateCF(true, cf.getConfig()); + internalCreateCF(cf.getConfig()); } List destinations = storage.recoverDestinations(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 3637289db8..9ac41efeb3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -85,6 +85,11 @@ public class ProtonProtocolManager implements ProtocolManager, Noti // no op } + @Override + public boolean acceptsNoHandshake() { + return false; + } + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection); diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java index cb47e85f41..012727f9a8 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -19,76 +19,36 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI; +import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - public class HQPropertiesConversionInterceptor implements Interceptor { - private static Map dictionary; - static { - Map d = new HashMap(); + private final boolean replaceHQ; - // Add entries for outgoing messages - d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); - d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); - d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); - d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); - d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); - d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); - d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); - d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); - d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); - d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); - - // Add entries for incoming messages - d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY")); - d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS")); - d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE")); - d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID")); - d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID")); - d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED")); - d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE")); - d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY")); - d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID")); - d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME")); - - dictionary = Collections.unmodifiableMap(d); + public HQPropertiesConversionInterceptor(final boolean replaceHQ) { + this.replaceHQ = replaceHQ; } @Override public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { - if (isMessagePacket(packet)) { - handleReceiveMessage((MessagePacket) packet); + + if (HQPropertiesConverter.isMessagePacket(packet)) { + handleReceiveMessage((MessagePacketI) packet); } return true; } - private void handleReceiveMessage(MessagePacket messagePacket) { - Message message = messagePacket.getMessage(); - // We are modifying the key set so we iterate over a shallow copy. - for (SimpleString property : new HashSet<>(message.getPropertyNames())) { - if (dictionary.containsKey(property)) { - message.putObjectProperty(dictionary.get(property), message.removeProperty(property)); - } + private void handleReceiveMessage(MessagePacketI messagePacket) { + if (replaceHQ) { + HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); + } + else { + HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); } } - private boolean isMessagePacket(Packet packet) { - int type = packet.getType(); - return type == PacketImpl.SESS_SEND || - type == PacketImpl.SESS_SEND_CONTINUATION || - type == PacketImpl.SESS_SEND_LARGE || - type == PacketImpl.SESS_RECEIVE_LARGE_MSG || - type == PacketImpl.SESS_RECEIVE_MSG; - } } diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java index 3d6dab5932..bd4274a0af 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.protocol.hornetq; +import java.nio.charset.StandardCharsets; +import java.util.List; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager; @@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import java.nio.charset.StandardCharsets; -import java.util.List; - /** * HornetQ Protocol Manager */ @@ -53,6 +53,12 @@ class HornetQProtocolManager extends CoreProtocolManager { } } + @Override + public boolean acceptsNoHandshake() { + return true; + } + + @Override public boolean isProtocol(byte[] array) { String frameStart = new String(array, StandardCharsets.US_ASCII); diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java index a163459b67..33c9a7808c 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java @@ -34,9 +34,8 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory { public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor(); - incomingInterceptors.add(propertyConversionInterceptor); - outgoingInterceptors.add(propertyConversionInterceptor); + incomingInterceptors.add(new HQPropertiesConversionInterceptor(true)); + outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false)); return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); } diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java new file mode 100644 index 0000000000..a1d9a60dac --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.version.Version; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; + +public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager { + + private static final int VERSION_PLAYED = 123; + protected void sendHandshake(Connection transportConnection) { + } + + + protected SessionContext newSessionContext(String name, + int confirmationWindowSize, + Channel sessionChannel, + CreateSessionResponseMessage response) { + // these objects won't be null, otherwise it would keep retrying on the previous loop + return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); + } + + @Override + protected Packet newCreateSessionPacket(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize, + long sessionChannelID) { + return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + } + + @Override + public void sendSubscribeTopology(final boolean isServer) { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); + } + + + +} diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java new file mode 100644 index 0000000000..ed57cfe94c --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; + +public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { + + + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true)); + locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false)); + } + + @Override + public ClientProtocolManager newProtocolManager() { + return new HornetQClientProtocolManager(); + } +} diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java new file mode 100644 index 0000000000..169a82a684 --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.client; + +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; +import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; + +public class HornetQClientSessionContext extends ActiveMQSessionContext { + + public HornetQClientSessionContext(String name, + RemotingConnection remotingConnection, + Channel sessionChannel, + int serverVersion, + int confirmationWindow) { + super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow); + } + + + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + return response.toQueueQuery(); + } + + protected CreateSessionMessage newCreateSession(String username, + String password, + int minLargeMessageSize, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + SimpleString defaultAddress) { + return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); + } + + + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { + SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false); + } + + public ClientConsumerInternal createConsumer(SimpleString queueName, + SimpleString filterString, + int windowSize, + int maxRate, + int ackBatchSize, + boolean browseOnly, + Executor executor, + Executor flowControlExecutor) throws ActiveMQException { + long consumerID = idGenerator.generateID(); + + ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); + + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); + + SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + // The actual windows size that gets used is determined by the user since + // could be overridden on the queue settings + // The value we send is just a hint + + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + } + + +} diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java new file mode 100644 index 0000000000..9240e55624 --- /dev/null +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public class HQPropertiesConverter { + + private static Map hqAmqDictionary; + private static Map amqHqDictionary; + + static { + Map d = new HashMap(); + + // Add entries for outgoing messages + d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY")); + d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS")); + d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE")); + d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID")); + d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID")); + d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED")); + d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE")); + d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY")); + d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID")); + d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME")); + + hqAmqDictionary = Collections.unmodifiableMap(d); + + d = new HashMap<>(); + // inverting the direction + for (Map.Entry entry: hqAmqDictionary.entrySet()) { + d.put(entry.getValue(), entry.getKey()); + } + + amqHqDictionary = Collections.unmodifiableMap(d); + } + + public static void replaceAMQProperties(final Message message) { + replaceDict(message, amqHqDictionary); + } + + public static void replaceHQProperties(final Message message) { + replaceDict(message, hqAmqDictionary); + } + + private static void replaceDict(final Message message, Map dictionary) { + for (SimpleString property : new HashSet<>(message.getPropertyNames())) { + SimpleString replaceTo = dictionary.get(property); + if (replaceTo != null) { + message.putObjectProperty(replaceTo, message.removeProperty(property)); + } + } + } + + public static boolean isMessagePacket(Packet packet) { + int type = packet.getType(); + return type == PacketImpl.SESS_SEND || + type == PacketImpl.SESS_SEND_LARGE || + type == PacketImpl.SESS_RECEIVE_LARGE_MSG || + type == PacketImpl.SESS_RECEIVE_MSG; + } + +} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index ce75e4d268..e99272f767 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.List; + import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; @@ -32,8 +34,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import java.util.List; - /** * MQTTProtocolManager */ @@ -79,6 +79,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener { } } + @Override + public boolean acceptsNoHandshake() { + return false; + } + + @Override public void removeHandler(String name) { // TODO add support for handlers diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 04be7aa3ef..9b35b90e81 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -150,6 +150,11 @@ public class OpenWireProtocolManager implements ProtocolManager, No } + @Override + public boolean acceptsNoHandshake() { + return false; + } + public ProtocolManagerFactory getFactory() { return factory; } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 98d21e4227..6dcd3512b5 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -104,6 +104,11 @@ class StompProtocolManager implements ProtocolManager, No this.outgoingInterceptors = outgoingInterceptors; } + @Override + public boolean acceptsNoHandshake() { + return false; + } + @Override public ProtocolManagerFactory getFactory() { return factory; diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index b4370c9676..2a40e01463 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -879,6 +879,22 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties.setProducerWindowSize(producerWindowSize); } + public String getProtocolManagerFactoryStr() { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("getProtocolManagerFactoryStr()"); + } + + return raProperties.getProtocolManagerFactoryStr(); + } + + public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("setProtocolManagerFactoryStr(" + protocolManagerFactoryStr + ")"); + } + + raProperties.setProtocolManagerFactoryStr(protocolManagerFactoryStr); + } + /** * Get min large message size * @@ -1971,6 +1987,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { if (val5 != null) { cf.setConnectionLoadBalancingPolicyClassName(val5); } + val5 = overrideProperties.getProtocolManagerFactoryStr() != null ? overrideProperties.getProtocolManagerFactoryStr() : raProperties.getProtocolManagerFactoryStr(); + if (val5 != null) { + cf.setProtocolManagerFactoryStr(val5); + } } public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activeMQRAManagedConnectionFactory) { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index 9edd1d86d0..21371865fb 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -118,6 +118,8 @@ public class ConnectionFactoryProperties { private String groupID; + private String protocolManagerFactoryStr; + /** * @return the transportType */ @@ -679,6 +681,14 @@ public class ConnectionFactoryProperties { hasBeenUpdated = true; } + public String getProtocolManagerFactoryStr() { + return protocolManagerFactoryStr; + } + + public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) { + this.protocolManagerFactoryStr = protocolManagerFactoryStr; + } + public boolean isHasBeenUpdated() { return hasBeenUpdated; } @@ -890,6 +900,12 @@ public class ConnectionFactoryProperties { } else if (!this.producerWindowSize.equals(other.producerWindowSize)) return false; + else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr)) + return false; + if (this.protocolManagerFactoryStr == null) { + if (other.protocolManagerFactoryStr != null) + return false; + } if (this.reconnectAttempts == null) { if (other.reconnectAttempts != null) return false; @@ -971,6 +987,7 @@ public class ConnectionFactoryProperties { result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode()); result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode()); result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode()); + result = prime * result + ((protocolManagerFactoryStr == null) ? 0 : protocolManagerFactoryStr.hashCode()); result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode()); result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode()); result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index ed9edb14a7..4a69128b11 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -257,6 +257,15 @@ public interface Configuration { Configuration addAcceptorConfiguration(final TransportConfiguration infos); + /** + * Add an acceptor to the config + * @param name the name of the acceptor + * @param uri the URI of the acceptor + * @return this + * @throws Exception in case of Parsing errors on the URI + */ + Configuration addAcceptorConfiguration(String name, String uri) throws Exception; + Configuration clearAcceptorConfigurations(); /** @@ -271,6 +280,8 @@ public interface Configuration { Configuration addConnectorConfiguration(final String key, final TransportConfiguration info); + Configuration addConnectorConfiguration(final String name, final String uri) throws Exception; + Configuration clearConnectorConfigurations(); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 8b21f8af27..b8b4c2ded2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -51,6 +51,8 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings; +import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser; +import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser; import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader; public class ConfigurationImpl implements Configuration, Serializable { @@ -337,6 +339,19 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception { + + AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser(); + + List configurations = parser.newObject(parser.expandURI(uri), name); + + for (TransportConfiguration config : configurations) { + addAcceptorConfiguration(config); + } + + return this; + } + public ConfigurationImpl clearAcceptorConfigurations() { acceptorConfigs.clear(); return this; @@ -356,6 +371,21 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + + public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception { + + ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser(); + + List configurations = parser.newObject(parser.expandURI(uri), name); + + for (TransportConfiguration config : configurations) { + addConnectorConfiguration(name, config); + } + + return this; + } + + public ConfigurationImpl clearConnectorConfigurations() { connectorConfigs.clear(); return this; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index e33288790a..78a5a62c4b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -154,7 +154,15 @@ public class ProtocolHandler { //if we get here we assume we use the core protocol as we match nothing else if (protocolToUse == null) { - protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL; + for (Map.Entry entry : protocolMap.entrySet()) { + if (entry.getValue().acceptsNoHandshake()) { + protocolToUse = entry.getKey(); + break; + } + } + if (protocolToUse == null) { + protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL; + } } ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse); ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index da6e5b13bb..6295ed6a6c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Pair; @@ -98,6 +98,11 @@ public class CoreProtocolManager implements ProtocolManager { this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing)); } + @Override + public boolean acceptsNoHandshake() { + return false; + } + /** * no need to implement this now * @@ -162,23 +167,25 @@ public class CoreProtocolManager implements ProtocolManager { @Override public boolean isProtocol(byte[] array) { - String frameStart = new String(array, StandardCharsets.US_ASCII); - return frameStart.startsWith("ACTIVEMQ"); + return isArtemis(ActiveMQBuffers.wrappedBuffer(array)); } @Override public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { //if we are not an old client then handshake - if (buffer.getByte(0) == 'A' && + if (isArtemis(buffer)) { + buffer.readBytes(7); + } + } + + private boolean isArtemis(ActiveMQBuffer buffer) { + return buffer.getByte(0) == 'A' && buffer.getByte(1) == 'R' && buffer.getByte(2) == 'T' && buffer.getByte(3) == 'E' && buffer.getByte(4) == 'M' && buffer.getByte(5) == 'I' && - buffer.getByte(6) == 'S') { - //todo add some handshaking - buffer.readBytes(7); - } + buffer.getByte(6) == 'S'; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java index d5c0d3818a..918dd0d963 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.cluster; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; @@ -27,10 +28,23 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactor */ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolManagerFactory { - private static final ActiveMQServerSideProtocolManagerFactory INSTANCE = new ActiveMQServerSideProtocolManagerFactory(); - public static ActiveMQServerSideProtocolManagerFactory getInstance() { - return INSTANCE; + ServerLocator locator; + + @Override + public ServerLocator getLocator() { + return locator; + } + + @Override + public void setLocator(ServerLocator locator) { + this.locator = locator; + } + + public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) { + ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory(); + instance.setLocator(locator); + return instance; } private ActiveMQServerSideProtocolManagerFactory() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index 44d28c0658..bc4a0ee873 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -207,7 +207,7 @@ public class BackupManager implements ActiveMQComponent { backupServerLocator.setIdentity("backupLocatorFor='" + server + "'"); backupServerLocator.setReconnectAttempts(-1); backupServerLocator.setInitialConnectAttempts(-1); - backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator)); } } @@ -332,7 +332,7 @@ public class BackupManager implements ActiveMQComponent { } ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); return locator; } return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 1a115cfb24..bd097be333 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -182,7 +182,7 @@ public class ClusterController implements ActiveMQComponent { serverLocator.setReconnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1); //this is used for replication so need to use the server packet decoder - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); locators.put(name, serverLocator); } @@ -237,7 +237,7 @@ public class ClusterController implements ActiveMQComponent { * @return the Cluster Control */ public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) { - sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator())); return new ClusterControl(sf, server); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index e336b3300d..4db67fc55d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -129,7 +129,7 @@ public class ClusterConnectionBridge extends BridgeImpl { @Override protected ClientSessionFactoryInternal createSessionFactory() throws Exception { - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID); setSessionFactory(factory); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index e6b4bf6950..d290c6e105 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -595,7 +595,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn serverLocator.setAfterConnectionInternalListener(this); - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.start(server.getExecutorFactory().getExecutor()); } @@ -760,7 +760,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn targetLocator.setAfterConnectionInternalListener(this); - serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); targetLocator.setNodeID(nodeId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java index 77894bfaa1..77b9f3f9c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.util.List; +import java.util.Map; + import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -31,15 +34,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.QueueFactory; -import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.transaction.ResourceManager; -import java.util.List; -import java.util.Map; - /* * Instead of loading into its own post office this will use its parent server (the actual live server) and load into that. * Since the server is already running we have to make sure we don't route any message that may subsequently get deleted or acked. @@ -88,7 +88,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { ResourceManager resourceManager, Map>> duplicateIDMap) throws Exception { ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager()); - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java index 1c82bbf7b8..59ffd6ac2a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java @@ -115,7 +115,7 @@ public class LiveOnlyActivation extends Activation { try { scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer); //use a Node Locator to connect to the cluster - scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator)); LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer); scaleDownServerLocator.addClusterTopologyListener(nodeLocator); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index 8ff315a748..3de5d5da5e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -55,5 +55,10 @@ public interface ProtocolManager

{ */ MessageConverter getConverter(); + /** If this protocols accepts connectoins without an initial handshake. + * If true this protocol will be the failback case no other conenctions are made. + * New designed protocols should always require a handshake. This is only useful for legacy protocols. */ + boolean acceptsNoHandshake(); + void handshake(NettyServerConnection connection, ActiveMQBuffer buffer); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 436ec3c6c7..715094c2a8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -24,11 +24,15 @@ import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; @@ -206,6 +210,18 @@ public abstract class ActiveMQTestBase extends Assert { temporaryFolder = new TemporaryFolder(parent); } + protected T serialClone(Object object) throws Exception { + System.out.println("object::" + object); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + ObjectOutputStream obOut = new ObjectOutputStream(bout); + obOut.writeObject(object); + + ByteArrayInputStream binput = new ByteArrayInputStream(bout.toByteArray()); + ObjectInputStream obinp = new ObjectInputStream(binput); + return (T) obinp.readObject(); + + } + @After public void tearDown() throws Exception { for (ExecutorService s : executorSet) { diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java index e6bb229764..3b2b3e7824 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/ActiveMQXAResourceRecovery.java @@ -75,7 +75,7 @@ public class ActiveMQXAResourceRecovery { String username = parser.getUsername(); String password = parser.getPassword(); TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams); - xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null); + xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null, null); } res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs); diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java index 6b3172baeb..a75bdac9f8 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; /** * This represents the configuration of a single connection factory. @@ -42,13 +43,14 @@ public class XARecoveryConfig { private final String username; private final String password; private final Map properties; + private final ClientProtocolManagerFactory clientProtocolManager; public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map properties) { if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties); + return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); } else { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties); + return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); } } @@ -57,13 +59,44 @@ public class XARecoveryConfig { final TransportConfiguration[] transportConfiguration, final String username, final String password, - final Map properties) { - this.transportConfiguration = transportConfiguration; + final Map properties, + final ClientProtocolManagerFactory clientProtocolManager) { + TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length]; + for (int i = 0; i < transportConfiguration.length; i++) { + newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig(""); + } + + this.transportConfiguration = newTransportConfiguration; this.discoveryConfiguration = null; this.username = username; this.password = password; this.ha = ha; this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); + this.clientProtocolManager = clientProtocolManager; + } + + + public XARecoveryConfig(final boolean ha, + final TransportConfiguration[] transportConfiguration, + final String username, + final String password, + final Map properties) { + this(ha, transportConfiguration, username, password, properties, null); + } + + public XARecoveryConfig(final boolean ha, + final DiscoveryGroupConfiguration discoveryConfiguration, + final String username, + final String password, + final Map properties, + final ClientProtocolManagerFactory clientProtocolManager) { + this.discoveryConfiguration = discoveryConfiguration; + this.transportConfiguration = null; + this.username = username; + this.password = password; + this.ha = ha; + this.clientProtocolManager = clientProtocolManager; + this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); } public XARecoveryConfig(final boolean ha, @@ -71,12 +104,7 @@ public class XARecoveryConfig { final String username, final String password, final Map properties) { - this.discoveryConfiguration = discoveryConfiguration; - this.transportConfiguration = null; - this.username = username; - this.password = password; - this.ha = ha; - this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); + this(ha, discoveryConfiguration, username, password, properties, null); } public boolean isHA() { @@ -103,6 +131,10 @@ public class XARecoveryConfig { return properties; } + public ClientProtocolManagerFactory getClientProtocolManager() { + return clientProtocolManager; + } + /** * Create a serverLocator using the configuration * @@ -110,10 +142,10 @@ public class XARecoveryConfig { */ public ServerLocator createServerLocator() { if (getDiscoveryConfiguration() != null) { - return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()); + return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager); } else { - return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()); + return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); } } diff --git a/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java b/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java new file mode 100644 index 0000000000..b5f1021aa0 --- /dev/null +++ b/artemis-service-extensions/src/test/java/org/apache/activemq/artemis/service/extensions/tests/recovery/XARecoveryConfigTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.service.extensions.tests.recovery; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; +import org.junit.Assert; +import org.junit.Test; + + +public class XARecoveryConfigTest { + + @Test + public void testEquals() throws Exception { + String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory"; + + TransportConfiguration transportConfig = new TransportConfiguration(factClass, null); + XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig}, + null, null, null); + + TransportConfiguration transportConfig2 = new TransportConfiguration(factClass, null); + XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2}, + null, null, null); + + // They are using Different names + Assert.assertNotEquals(transportConfig, transportConfig2); + Assert.assertEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig("")); + + // The equals here shouldn't take the name into consideration + Assert.assertEquals(config, config2); + } + + @Test + public void testNotEquals() throws Exception { + String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory"; + + TransportConfiguration transportConfig = new TransportConfiguration(factClass, null); + XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig}, + null, null, null); + + TransportConfiguration transportConfig2 = new TransportConfiguration(factClass + "2", null); + XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2}, + null, null, null); + + // They are using Different names + Assert.assertNotEquals(transportConfig, transportConfig2); + Assert.assertNotEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig("")); + + // The equals here shouldn't take the name into consideration + Assert.assertNotEquals(config, config2); + } +} diff --git a/examples/features/standard/interceptor-client/pom.xml b/examples/features/standard/interceptor-client/pom.xml new file mode 100644 index 0000000000..5dd979c334 --- /dev/null +++ b/examples/features/standard/interceptor-client/pom.xml @@ -0,0 +1,110 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.broker + jms-examples + 1.1.1-SNAPSHOT + + + interceptor-client + jar + ActiveMQ Artemis JMS Interceptor Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create + + create + + + org.apache.activemq.examples.broker:interceptor:${project.version} + ${noServer} + ${basedir}/target/classes/activemq/server0 + + + + start + + cli + + + ${noServer} + true + tcp://localhost:61616 + + run + + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.InterceptorExample + + + + stop + + cli + + + ${noServer} + + stop + + + + + + + org.apache.activemq.examples.broker + interceptor-client + ${project.version} + + + + + + + diff --git a/examples/features/standard/interceptor-client/readme.html b/examples/features/standard/interceptor-client/readme.html new file mode 100644 index 0000000000..42b1e18570 --- /dev/null +++ b/examples/features/standard/interceptor-client/readme.html @@ -0,0 +1,72 @@ + + + + + ActiveMQ Artemis JMS Interceptor Example + + + + + +

JMS Interceptor Example

+ +
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ + +

This example shows you how to implement and configure a simple incoming, server-side interceptor with ActiveMQ Artemis.

+ +

ActiveMQ Artemis allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the + Interceptor interface, as defined below:

+
+     
+         public interface Interceptor
+         {
+            boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException;
+         }
+     
+     
+

Once you have your own interceptor class, add it to the broker.xml, as follows:

+
+     
+        <configuration>
+        ...
+           <remoting-incoming-interceptors>
+              <class-name>org.apache.activemq.artemis.jms.example.SimpleInterceptor</class-name>
+           </remoting-incoming-interceptors>
+        ...
+        </configuration>
+     
+     
+ +

With interceptor, you can handle various events in message processing. In this example, a simple interceptor, SimpleInterceptor, is implemented and configured. + When the example is running, the interceptor will print out each events that are passed in the interceptor. And it will add a string property to the message being + delivered. You can see that after the message is received, there will be a new string property appears in the received message.

+ +

With our interceptor we always return true from the intercept method. If we were + to return false that signifies that no more interceptors are to run or the target + is not to be called. Return false to abort processing of the packet.

+ + + + + + + + diff --git a/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/InterceptorExample.java b/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/InterceptorExample.java new file mode 100644 index 0000000000..e71f3fdd7f --- /dev/null +++ b/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/InterceptorExample.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; + +/** + * A simple JMS example that shows how to implement and use interceptors with ActiveMQ Artemis. + */ +public class InterceptorExample { + + public static void main(final String[] args) throws Exception { + Connection connection = null; + try { + ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName()); + connection = cf.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue("exampleQueue"); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + System.out.println("Sending message [" + message.getText() + + "] with String property: " + + message.getStringProperty("newproperty")); + + producer.send(message); + + MessageConsumer messageConsumer = session.createConsumer(queue); + + connection.start(); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + System.out.println("Received message [" + messageReceived.getText() + + "] with String property: " + + messageReceived.getStringProperty("newproperty")); + + if (messageReceived.getStringProperty("newproperty") == null) { + throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!"); + } + } + finally { + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/SimpleInterceptor.java b/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/SimpleInterceptor.java new file mode 100644 index 0000000000..5d00f37900 --- /dev/null +++ b/examples/features/standard/interceptor-client/src/main/java/org/apache/activemq/artemis/jms/example/SimpleInterceptor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +/** + * A simple Interceptor implementation + */ +public class SimpleInterceptor implements Interceptor { + + public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException { + System.out.println("SimpleInterceptor gets called!"); + System.out.println("Packet: " + packet.getClass().getName()); + System.out.println("RemotingConnection: " + connection.getRemoteAddress()); + + if (packet instanceof SessionReceiveMessage) { + SessionReceiveMessage realPacket = (SessionReceiveMessage) packet; + Message msg = realPacket.getMessage(); + msg.putStringProperty(new SimpleString("newproperty"), new SimpleString("Hello from interceptor!")); + } + // We return true which means "call next interceptor" (if there is one) or target. + // If we returned false, it means "abort call" - no more interceptors would be called and neither would + // the target + return true; + } + +} diff --git a/examples/features/standard/pom.xml b/examples/features/standard/pom.xml index 62c8cac9a7..fe56e8764f 100644 --- a/examples/features/standard/pom.xml +++ b/examples/features/standard/pom.xml @@ -54,6 +54,7 @@ under the License. expiry http-transport interceptor + interceptor-client instantiate-connection-factory jms-auto-closeable jms-bridge @@ -112,6 +113,7 @@ under the License. expiry http-transport interceptor + interceptor-client jms-auto-closeable instantiate-connection-factory jms-bridge diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java new file mode 100644 index 0000000000..3768f8b822 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolManagerTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.protocols.hornetq; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; +import org.apache.activemq.artemis.jms.server.config.JMSConfiguration; +import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.artemis.ra.recovery.RecoveryManager; +import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HornetQProtocolManagerTest extends ActiveMQTestBase { + + ActiveMQServer server; + EmbeddedJMS embeddedJMS; + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration configuration = createDefaultConfig(false); + configuration.setPersistenceEnabled(false); + configuration.getAcceptorConfigurations().clear(); + configuration.addAcceptorConfiguration("legacy", "tcp://localhost:61616?protocols=HORNETQ"). + addAcceptorConfiguration("corepr", "tcp://localhost:61617?protocols=CORE"); + + configuration.addConnectorConfiguration("legacy", "tcp://localhost:61616"); + JMSConfiguration jmsConfiguration = new JMSConfigurationImpl(); + + jmsConfiguration.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName("testQueue").setBindings("testQueue")); + embeddedJMS = new EmbeddedJMS(); + embeddedJMS.setConfiguration(configuration); + embeddedJMS.setJmsConfiguration(jmsConfiguration); + embeddedJMS.start(); + } + + public void tearDown() throws Exception { + embeddedJMS.stop(); + super.tearDown(); + } + + @Test + public void testLegacy() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=" + HornetQClientProtocolManagerFactory.class.getName()); + connectionFactory.createConnection().close(); + ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("tcp://localhost:61617"); + connectionFactory2.createConnection().close(); + + RecoveryManager manager = new RecoveryManager(); + manager.register(connectionFactory, null, null, new ConcurrentHashMap()); + manager.register(connectionFactory2, null, null, new ConcurrentHashMap()); + + for (XARecoveryConfig resource :manager.getResources()) { + ServerLocator locator = resource.createServerLocator(); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(); + session.close(); + factory.close(); + locator.close(); + } + + } + + + /** This test will use an ArtemisConnectionFactory with clientProtocolManager=*/ + @Test + public void testLegacy2() throws Exception { + + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(); + configuration.setConnectorNames("legacy"); + configuration.setName("legacy"); + configuration.setProtocolManagerFactoryStr(HornetQClientProtocolManagerFactory.class.getName()); + embeddedJMS.getJMSServerManager().createConnectionFactory(false, configuration, "legacy"); + + Queue queue = (Queue) embeddedJMS.lookup("testQueue"); + + + ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) embeddedJMS.lookup("legacy"); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("Test"); + for (int i = 0; i < 5; i++) { + message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(), "duplicate"); + producer.send(message); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + TextMessage messageRec = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(messageRec); + + Assert.assertEquals("Test", messageRec.getText()); + Assert.assertNull(consumer.receiveNoWait()); + connection.close(); + connectionFactory.close(); + + } + + +} + diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java index 789e73a6a6..c4b17f67ac 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.tests.extras.protocols.hornetq; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -29,15 +33,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.hornetq.api.core.client.HornetQClient; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - public class HornetQProtocolTest extends ActiveMQTestBase { protected ActiveMQServer server; @@ -59,6 +61,13 @@ public class HornetQProtocolTest extends ActiveMQTestBase { waitForServerToStart(server); } + + @After + public void tearDown() throws Exception { + org.hornetq.core.client.impl.ServerLocatorImpl.clearThreadPools(); + super.tearDown(); + } + @Test public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception { org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession(); @@ -83,6 +92,7 @@ public class HornetQProtocolTest extends ActiveMQTestBase { } ClientMessage coreMessage1 = coreConsumer.receive(1000); + System.err.println("Messages::==" + coreMessage1); assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID)); coreSession.close(); @@ -93,6 +103,39 @@ public class HornetQProtocolTest extends ActiveMQTestBase { hqSession.close(); } + @Test + public void testLargeMessagesOverHornetQClients() throws Exception { + org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession(); + + // Create Queue + String queueName = "test.hq.queue"; + hqSession.createQueue(queueName, queueName, true); + + // HornetQ Client Objects + hqSession.start(); + org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName); + org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName); + + for (int i = 0; i < 2; i++) { + org.hornetq.api.core.client.ClientMessage hqMessage = hqSession.createMessage(true); + hqMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024)); + hqProducer.send(hqMessage); + } + hqSession.commit(); + + for (int i = 0; i < 2; i++) { + org.hornetq.api.core.client.ClientMessage coreMessage1 = hqConsumer.receive(1000); + coreMessage1.acknowledge(); + System.err.println("Messages::==" + coreMessage1); + for (int j = 0; j < 10 * 1024; j++) { + Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j), coreMessage1.getBodyBuffer().readByte()); + } + + } + + hqSession.close(); + } + @Test public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception { org.hornetq.api.core.client.ClientSession session = createHQClientSession(); @@ -158,14 +201,14 @@ public class HornetQProtocolTest extends ActiveMQTestBase { } private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session) { - org.hornetq.api.core.client.ClientMessage message = session.createMessage(false); + org.hornetq.api.core.client.ClientMessage message = session.createMessage(true); String v = UUID.randomUUID().toString(); message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v); return message; } private ClientMessage createCoreTestMessage(ClientSession session) { - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(true); String v = UUID.randomUUID().toString(); message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v); return message; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java index 7a20c9ac36..0d52d05763 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java @@ -55,7 +55,7 @@ public class ClusterControllerTest extends ClusterTestBase { @Test public void controlWithDifferentConnector() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); clusterControl.authorize(); @@ -65,7 +65,7 @@ public class ClusterControllerTest extends ClusterTestBase { @Test public void controlWithDifferentPassword() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { - locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool()); ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory()); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index ae81eed0c2..d3ece006e9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1329,7 +1329,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc); } - locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance()); + locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node])); locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); addServerLocator(locators[node]); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java new file mode 100644 index 0000000000..a517764501 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Incoming.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.interceptors; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +public class Incoming implements Interceptor { + + public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException { + + System.out.println("Incoming:Packet : " + packet); + if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) { + SessionReceiveMessage p = (SessionReceiveMessage) packet; + + p.getMessage().putStringProperty("Incoming", "was here"); + + } + + return true; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java similarity index 93% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java index c87833b8c7..d507ce2681 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/InterceptorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/InterceptorTest.java @@ -14,7 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.tests.integration; +package org.apache.activemq.artemis.tests.integration.interceptors; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -40,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -1027,4 +1038,52 @@ public class InterceptorTest extends ActiveMQTestBase { session.close(); } + + + @Test + public void testInterceptorOnURI() throws Exception { + locator.close(); + + String uri = "tcp://localhost:61616?incomingInterceptorList=" + Incoming.class.getCanonicalName() + "&outgoingInterceptorList=" + Outgoing.class.getName(); + + System.out.println(uri); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); + + // Serialize stuff to make sure the interceptors are on the URI + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.close(); + objectOutputStream.writeObject(factory); + ByteArrayInputStream input = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + ObjectInputStream objectInputStream = new ObjectInputStream(input); + + factory = (ActiveMQConnectionFactory) objectInputStream.readObject(); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE.toString())); + + producer.send(session.createTextMessage("HelloMessage")); + + + connection.start(); + + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE.toString())); + + TextMessage msg = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(consumer); + + Assert.assertEquals("HelloMessage", msg.getText()); + + Assert.assertEquals("was here", msg.getStringProperty("Incoming")); + + Assert.assertEquals("sending", msg.getStringProperty("Outgoing")); + + connection.close(); + + factory.close(); + + + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Outgoing.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Outgoing.java new file mode 100644 index 0000000000..bc6e44aec4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/interceptors/Outgoing.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.interceptors; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; + +public class Outgoing implements Interceptor { + + public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException { + System.out.println("Outgoin:Packet : " + packet); + if (packet.getType() == PacketImpl.SESS_SEND) { + SessionSendMessage p = (SessionSendMessage) packet; + + p.getMessage().putStringProperty("Outgoing", "sending"); + + } + + return true; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java index d1fedeb557..79ccd2829a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java @@ -44,19 +44,32 @@ public class ConnectionTest extends JMSTestBase { @Test public void testThroughNewConnectionFactory() throws Exception { - testThroughNewConnectionFactory(new ActiveMQConnectionFactory("vm://0")); - testThroughNewConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" + - "retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" + - "blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" + - "cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" + - "callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" + - "blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" + - "autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" + - "transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" + - "connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." + - "RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" + - "consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" + - "port=61616&host=localhost#")); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0"); + testThroughNewConnectionFactory(connectionFactory); + + // Run it again with a cloned through serialization CF, simulating JNDI lookups + connectionFactory = serialClone(connectionFactory); + testThroughNewConnectionFactory(connectionFactory); + + + connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" + + "retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" + + "blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" + + "cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" + + "callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" + + "blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" + + "autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" + + "transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" + + "connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." + + "RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" + + "consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" + + "port=61616&host=localhost#"); + + testThroughNewConnectionFactory(connectionFactory); + + // Run it again with a cloned through serialization CF, simulating JNDI lookups + connectionFactory = serialClone(connectionFactory); + testThroughNewConnectionFactory(connectionFactory); } private void testThroughNewConnectionFactory(ActiveMQConnectionFactory factory) throws Exception { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java index 808214c401..080b837484 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java @@ -384,6 +384,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase { " JgroupsChannelRefName" + " java.lang.String" + " " + + " " + + " " + + " ProtocolManagerConfig" + + " ProtocolManagerFactoryStr" + + " java.lang.String" + + " " + " "; private static String rootConfig = "" + config + commentedOutConfigs + ""; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java index 74bed994bf..7927206dac 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java @@ -41,6 +41,8 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase { static { UNSUPPORTED_CF_PROPERTIES = new TreeSet(); UNSUPPORTED_CF_PROPERTIES.add("discoveryGroupName"); + UNSUPPORTED_CF_PROPERTIES.add("incomingInterceptorList"); + UNSUPPORTED_CF_PROPERTIES.add("outgoingInterceptorList"); UNSUPPORTED_RA_PROPERTIES = new TreeSet(); UNSUPPORTED_RA_PROPERTIES.add("HA");