This closes #191 Protocol changes

This commit is contained in:
Clebert Suconic 2015-10-08 22:55:09 -04:00
commit fcf18a7644
63 changed files with 1601 additions and 156 deletions

View File

@ -174,10 +174,19 @@ public class TransportConfiguration implements Serializable {
TransportConfiguration that = (TransportConfiguration) o;
if (!factoryClassName.equals(that.factoryClassName))
if (!isSameHost(that)) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null)
return false;
return true;
}
public boolean isSameHost(TransportConfiguration that) {
if (!factoryClassName.equals(that.factoryClassName))
return false;
if (params != null ? !params.equals(that.params) : that.params != null)
return false;

View File

@ -769,6 +769,20 @@ public interface ServerLocator extends AutoCloseable {
ClientProtocolManagerFactory getProtocolManagerFactory();
void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager);
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
ServerLocator setIncomingInterceptorList(String interceptorList);
String getIncomingInterceptorList();
ServerLocator setOutgoingInterceptorList(String interceptorList);
String getOutgoingInterceptorList();
}

View File

@ -1269,7 +1269,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean isLast) {
// if it is our connector then set the live id used for failover
if (connectorPair.getA() != null && connectorPair.getA().equals(connectorConfig)) {
if (connectorPair.getA() != null && connectorPair.getA().isSameHost(connectorConfig)) {
liveNodeID = nodeID;
}
serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);

View File

@ -80,7 +80,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static final long serialVersionUID = -1615857864410205260L;
// This is the default value
private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
private final boolean ha;
@ -201,12 +201,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration clusterTransportConfiguration;
/*
* *************WARNING***************
* remember that when adding any new classes that we have to support serialization with previous clients.
* If you need to, make them transient and handle the serialization yourself
* */
private final Exception traceException = new Exception();
// To be called when there are ServerLocator being finalized.
@ -619,14 +613,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientProtocolManagerFactory getProtocolManagerFactory() {
if (protocolManagerFactory == null) {
// this could happen over serialization from older versions
protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance();
// Default one in case it's null
protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(this);
}
return protocolManagerFactory;
}
public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
public ServerLocator setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) {
this.protocolManagerFactory = protocolManagerFactory;
protocolManagerFactory.setLocator(this);
return this;
}
public void disableFinalizeCheck() {
@ -860,10 +856,41 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory;
}
@Override
public boolean isHA() {
return ha;
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
@Override
public ServerLocator setIncomingInterceptorList(String interceptorList) {
feedInterceptors(incomingInterceptors, interceptorList);
return this;
}
@Override
public String getIncomingInterceptorList() {
return fromInterceptors(incomingInterceptors);
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
@Override
public ServerLocator setOutgoingInterceptorList(String interceptorList) {
feedInterceptors(outgoingInterceptors, interceptorList);
return this;
}
@Override
public String getOutgoingInterceptorList() {
return fromInterceptors(outgoingInterceptors);
}
public boolean isCacheLargeMessagesClient() {
return cacheLargeMessagesClient;
}
@ -1775,4 +1802,40 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public boolean isReceivedToplogy() {
return receivedTopology;
}
private String fromInterceptors(final List<Interceptor> interceptors) {
StringBuffer buffer = new StringBuffer();
boolean first = true;
for (Interceptor value : interceptors) {
if (!first) {
buffer.append(",");
}
first = false;
buffer.append(value.getClass().getName());
}
return buffer.toString();
}
private void feedInterceptors(final List<Interceptor> interceptors, final String interceptorList) {
interceptors.clear();
if (interceptorList == null || interceptorList.trim().equals("")) {
return;
}
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
String[] arrayInterceptor = interceptorList.split(",");
for (String strValue : arrayInterceptor) {
Interceptor interceptor = (Interceptor) ClassloadingUtil.newInstanceFromClassLoader(strValue.trim());
interceptors.add(interceptor);
}
return null;
}
});
}
}

View File

@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
public interface ServerLocatorInternal extends ServerLocator {

View File

@ -107,7 +107,7 @@ public final class TopologyMemberImpl implements TopologyMember {
}
public boolean isMember(TransportConfiguration configuration) {
if (getConnector().getA() != null && getConnector().getA().equals(configuration) || getConnector().getB() != null && getConnector().getB().equals(configuration)) {
if (getConnector().getA() != null && getConnector().getA().isSameHost(configuration) || getConnector().getB() != null && getConnector().getB().isSameHost(configuration)) {
return true;
}
else {

View File

@ -276,7 +276,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
long sessionChannelID = connection.generateChannelID();
Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
Packet request = newCreateSessionPacket(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize, sessionChannelID);
try {
// channel1 reference here has to go away
@ -325,10 +325,30 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
inCreateSessionLatch.countDown();
}
} while (retry);
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
}
protected Packet newCreateSessionPacket(Version clientVersion,
String name,
String username,
String password,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
int minLargeMessageSize,
int confirmationWindowSize,
long sessionChannelID) {
return new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
}
protected SessionContext newSessionContext(String name,
int confirmationWindowSize,
Channel sessionChannel,
CreateSessionResponseMessage response) {
// these objects won't be null, otherwise it would keep retrying on the previous loop
return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
}
public boolean cleanupBeforeFailover(ActiveMQException cause) {
@ -398,7 +418,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
return connection;
}
private void sendHandshake(Connection transportConnection) {
protected void sendHandshake(Connection transportConnection) {
if (transportConnection.isUsingProtocolHandling()) {
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
@ -23,13 +24,25 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
private static final long serialVersionUID = 1;
private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory();
private ActiveMQClientProtocolManagerFactory() {
}
public static final ActiveMQClientProtocolManagerFactory getInstance() {
return INSTANCE;
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
}
public static final ActiveMQClientProtocolManagerFactory getInstance(ServerLocator locator) {
ActiveMQClientProtocolManagerFactory factory = new ActiveMQClientProtocolManagerFactory();
factory.setLocator(locator);
return factory;
}
public ClientProtocolManager newProtocolManager() {

View File

@ -114,6 +114,19 @@ public class ActiveMQSessionContext extends SessionContext {
private int confirmationWindow;
private final String name;
protected Channel getSessionChannel() {
return sessionChannel;
}
protected String getName() {
return name;
}
protected int getConfirmationWindow() {
return confirmationWindow;
}
public ActiveMQSessionContext(String name,
RemotingConnection remotingConnection,
Channel sessionChannel,
@ -536,7 +549,7 @@ public class ActiveMQSessionContext extends SessionContext {
final boolean autoCommitAcks,
final boolean preAcknowledge,
final SimpleString defaultAddress) throws ActiveMQException {
Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
boolean retry;
do {
try {
@ -564,6 +577,17 @@ public class ActiveMQSessionContext extends SessionContext {
} while (retry && !session.isClosing());
}
protected CreateSessionMessage newCreateSession(String username,
String password,
int minLargeMessageSize,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
}
@Override
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
@ -724,7 +748,7 @@ public class ActiveMQSessionContext extends SessionContext {
return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId();
}
private ClassLoader lookupTCCL() {
protected ClassLoader lookupTCCL() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
@ -733,7 +757,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
private int calcWindowSize(final int windowSize) {
protected int calcWindowSize(final int windowSize) {
int clientWindowSize;
if (windowSize == -1) {
// No flow control - buffer can increase without bound! Only use with

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public abstract class MessagePacket extends PacketImpl {
public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
protected MessageInternal message;

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.Message;
public interface MessagePacketI {
Message getMessage();
}

View File

@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionReceiveLargeMessage extends PacketImpl {
public class SessionReceiveLargeMessage extends PacketImpl implements MessagePacketI {
private final MessageInternal message;
@ -58,6 +59,11 @@ public class SessionReceiveLargeMessage extends PacketImpl {
return message;
}
@Override
public Message getMessage() {
return message;
}
public long getConsumerID() {
return consumerID;
}

View File

@ -17,10 +17,11 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionSendLargeMessage extends PacketImpl {
public class SessionSendLargeMessage extends PacketImpl implements MessagePacketI {
/**
* Used only if largeMessage
@ -43,6 +44,11 @@ public class SessionSendLargeMessage extends PacketImpl {
return largeMessage;
}
@Override
public Message getMessage() {
return largeMessage;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
largeMessage.encodeHeadersAndProperties(buffer);

View File

@ -16,7 +16,13 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public interface ClientProtocolManagerFactory {
ClientProtocolManager newProtocolManager();
void setLocator(ServerLocator locator);
ServerLocator getLocator();
}

View File

@ -39,6 +39,8 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -50,8 +52,10 @@ import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory;
import org.apache.activemq.artemis.jms.referenceable.SerializableObjectRefAddr;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
/**
* <p>ActiveMQ Artemis implementation of a JMS ConnectionFactory.</p>
@ -73,6 +77,8 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
private String password;
private String protocolManagerFactoryStr;
public void writeExternal(ObjectOutput out) throws IOException {
URI uri = toURI();
@ -121,6 +127,27 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
return uri;
}
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
if (protocolManagerFactoryStr != null && !protocolManagerFactoryStr.trim().isEmpty()) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
ClientProtocolManagerFactory protocolManagerFactory =
(ClientProtocolManagerFactory) ClassloadingUtil.newInstanceFromClassLoader(protocolManagerFactoryStr);
serverLocator.setProtocolManagerFactory(protocolManagerFactory);
return null;
}
});
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
String url = in.readUTF();
ConnectionFactoryParser parser = new ConnectionFactoryParser();
@ -606,6 +633,31 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
serverLocator.setInitialMessagePacketSize(size);
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
public void setIncomingInterceptorList(String interceptorList) {
checkWrite();
serverLocator.setIncomingInterceptorList(interceptorList);
}
public String getIncomingInterceptorList() {
return serverLocator.getIncomingInterceptorList();
}
/**
* @param interceptorList a comma separated string of incoming interceptor class names to be used. Each interceptor needs a default Constructor to be used with this method.
* @return this
*/
public void setOutgoingInterceptorList(String interceptorList) {
serverLocator.setOutgoingInterceptorList(interceptorList);
}
public String getOutgoingInterceptorList() {
return serverLocator.getOutgoingInterceptorList();
}
public ActiveMQConnectionFactory setUser(String user) {
checkWrite();
this.user = user;

View File

@ -26,6 +26,7 @@ import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -56,6 +57,14 @@ public class ConnectionFactoryURITest {
private static final String IPV6 = "fe80::baf6:b1ff:fe12:daf7%eth0";
private static Set<String> ignoreList = new HashSet<String>();
static {
ignoreList.add("protocolManagerFactoryStr");
ignoreList.add("incomingInterceptorList");
ignoreList.add("outgoingInterceptorList");
}
@Test
public void testIPv6() throws Exception {
Map<String,Object> params = new HashMap<>();
@ -379,6 +388,10 @@ public class ConnectionFactoryURITest {
ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException {
PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory);
for (PropertyDescriptor descriptor : descriptors) {
if (ignoreList.contains(descriptor.getName())) {
continue;
}
System.err.println("name::" + descriptor.getName());
if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null) {
if (descriptor.getPropertyType() == String.class) {
String value = RandomUtil.randomString();

View File

@ -44,6 +44,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
ConnectionFactoryConfiguration setConnectorNames(List<String> connectorNames);
ConnectionFactoryConfiguration setConnectorNames(String...connectorNames);
boolean isHA();
ConnectionFactoryConfiguration setHA(boolean ha);
@ -170,5 +172,9 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType);
ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr);
String getProtocolManagerFactoryStr();
JMSFactoryType getFactoryType();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jms.server.config.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -113,6 +114,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private String groupID = null;
private String protocolManagerFactoryStr;
private JMSFactoryType factoryType = JMSFactoryType.CF;
// Static --------------------------------------------------------
@ -170,6 +173,11 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this;
}
public ConnectionFactoryConfiguration setConnectorNames(final String...names) {
return this.setConnectorNames(Arrays.asList(names));
}
public boolean isHA() {
return ha;
}
@ -534,6 +542,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
factoryType = JMSFactoryType.valueOf(buffer.readInt());
protocolManagerFactoryStr = BufferHelper.readNullableSimpleStringAsString(buffer);
}
@Override
@ -618,6 +628,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
buffer.writeInt(factoryType.intValue());
BufferHelper.writeAsNullableSimpleString(buffer, protocolManagerFactoryStr);
}
@Override
@ -724,7 +736,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.sizeOfNullableSimpleString(groupID) +
DataConstants.SIZE_INT; // factoryType
DataConstants.SIZE_INT +
// factoryType
BufferHelper.sizeOfNullableSimpleString(protocolManagerFactoryStr);
return size;
}
@ -749,6 +764,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this.compressLargeMessage;
}
@Override
public ConnectionFactoryConfiguration setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
return this;
}
@Override
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
// Public --------------------------------------------------------
// Package protected ---------------------------------------------

View File

@ -955,7 +955,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
public void runException() throws Exception {
checkBindings(bindings);
ActiveMQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
ActiveMQConnectionFactory cf = internalCreateCF(cfConfig);
ArrayList<String> bindingsToAdd = new ArrayList<String>();
@ -1075,8 +1075,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* @param cfConfig
* @throws Exception
*/
private ActiveMQConnectionFactory internalCreateCF(final boolean persisted,
final ConnectionFactoryConfiguration cfConfig) throws Exception {
private ActiveMQConnectionFactory internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws Exception {
checkInitialised();
ActiveMQConnectionFactory cf = connectionFactories.get(cfConfig.getName());
@ -1168,6 +1167,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
cf.setGroupID(cfConfig.getGroupID());
cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr());
return cf;
}
@ -1445,7 +1445,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
for (PersistedConnectionFactory cf : cfs) {
internalCreateCF(true, cf.getConfig());
internalCreateCF(cf.getConfig());
}
List<PersistedDestination> destinations = storage.recoverDestinations();

View File

@ -85,6 +85,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
// no op
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);

View File

@ -19,76 +19,36 @@ package org.apache.activemq.artemis.core.protocol.hornetq;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
import org.apache.activemq.artemis.core.protocol.hornetq.util.HQPropertiesConverter;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
public class HQPropertiesConversionInterceptor implements Interceptor {
private static Map<SimpleString, SimpleString> dictionary;
static {
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
private final boolean replaceHQ;
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
// Add entries for incoming messages
d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
dictionary = Collections.unmodifiableMap(d);
public HQPropertiesConversionInterceptor(final boolean replaceHQ) {
this.replaceHQ = replaceHQ;
}
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (isMessagePacket(packet)) {
handleReceiveMessage((MessagePacket) packet);
if (HQPropertiesConverter.isMessagePacket(packet)) {
handleReceiveMessage((MessagePacketI) packet);
}
return true;
}
private void handleReceiveMessage(MessagePacket messagePacket) {
Message message = messagePacket.getMessage();
// We are modifying the key set so we iterate over a shallow copy.
for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
if (dictionary.containsKey(property)) {
message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
}
private void handleReceiveMessage(MessagePacketI messagePacket) {
if (replaceHQ) {
HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage());
}
else {
HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage());
}
}
private boolean isMessagePacket(Packet packet) {
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_CONTINUATION ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.hornetq;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* HornetQ Protocol Manager
*/
@ -53,6 +53,12 @@ class HornetQProtocolManager extends CoreProtocolManager {
}
}
@Override
public boolean acceptsNoHandshake() {
return true;
}
@Override
public boolean isProtocol(byte[] array) {
String frameStart = new String(array, StandardCharsets.US_ASCII);

View File

@ -34,9 +34,8 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors) {
Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
incomingInterceptors.add(propertyConversionInterceptor);
outgoingInterceptors.add(propertyConversionInterceptor);
incomingInterceptors.add(new HQPropertiesConversionInterceptor(true));
outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false));
return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager {
private static final int VERSION_PLAYED = 123;
protected void sendHandshake(Connection transportConnection) {
}
protected SessionContext newSessionContext(String name,
int confirmationWindowSize,
Channel sessionChannel,
CreateSessionResponseMessage response) {
// these objects won't be null, otherwise it would keep retrying on the previous loop
return new HornetQClientSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
}
@Override
protected Packet newCreateSessionPacket(Version clientVersion,
String name,
String username,
String password,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
int minLargeMessageSize,
int confirmationWindowSize,
long sessionChannelID) {
return new CreateSessionMessage(name, sessionChannelID, VERSION_PLAYED, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
}
@Override
public void sendSubscribeTopology(final boolean isServer) {
getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED));
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory {
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
locator.addIncomingInterceptor(new HQPropertiesConversionInterceptor(true));
locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
}
@Override
public ClientProtocolManager newProtocolManager() {
return new HornetQClientProtocolManager();
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.client.impl.AddressQueryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
public class HornetQClientSessionContext extends ActiveMQSessionContext {
public HornetQClientSessionContext(String name,
RemotingConnection remotingConnection,
Channel sessionChannel,
int serverVersion,
int confirmationWindow) {
super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow);
}
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
return response.toQueueQuery();
}
protected CreateSessionMessage newCreateSession(String username,
String password,
int minLargeMessageSize,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
}
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false);
}
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
int windowSize,
int maxRate,
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
// The value we send is just a hint
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.hornetq.util;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class HQPropertiesConverter {
private static Map<SimpleString, SimpleString> hqAmqDictionary;
private static Map<SimpleString, SimpleString> amqHqDictionary;
static {
Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
// Add entries for outgoing messages
d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
hqAmqDictionary = Collections.unmodifiableMap(d);
d = new HashMap<>();
// inverting the direction
for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) {
d.put(entry.getValue(), entry.getKey());
}
amqHqDictionary = Collections.unmodifiableMap(d);
}
public static void replaceAMQProperties(final Message message) {
replaceDict(message, amqHqDictionary);
}
public static void replaceHQProperties(final Message message) {
replaceDict(message, hqAmqDictionary);
}
private static void replaceDict(final Message message, Map<SimpleString, SimpleString> dictionary) {
for (SimpleString property : new HashSet<>(message.getPropertyNames())) {
SimpleString replaceTo = dictionary.get(property);
if (replaceTo != null) {
message.putObjectProperty(replaceTo, message.removeProperty(property));
}
}
}
public static boolean isMessagePacket(Packet packet) {
int type = packet.getType();
return type == PacketImpl.SESS_SEND ||
type == PacketImpl.SESS_SEND_LARGE ||
type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
type == PacketImpl.SESS_RECEIVE_MSG;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
@ -32,8 +34,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.List;
/**
* MQTTProtocolManager
*/
@ -79,6 +79,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
}
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public void removeHandler(String name) {
// TODO add support for handlers

View File

@ -150,6 +150,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
public ProtocolManagerFactory<Interceptor> getFactory() {
return factory;
}

View File

@ -104,6 +104,11 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
this.outgoingInterceptors = outgoingInterceptors;
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
@Override
public ProtocolManagerFactory<StompFrameInterceptor> getFactory() {
return factory;

View File

@ -879,6 +879,22 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
raProperties.setProducerWindowSize(producerWindowSize);
}
public String getProtocolManagerFactoryStr() {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("getProtocolManagerFactoryStr()");
}
return raProperties.getProtocolManagerFactoryStr();
}
public void setProtocolManagerFactoryStr(final String protocolManagerFactoryStr) {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("setProtocolManagerFactoryStr(" + protocolManagerFactoryStr + ")");
}
raProperties.setProtocolManagerFactoryStr(protocolManagerFactoryStr);
}
/**
* Get min large message size
*
@ -1971,6 +1987,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
if (val5 != null) {
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
val5 = overrideProperties.getProtocolManagerFactoryStr() != null ? overrideProperties.getProtocolManagerFactoryStr() : raProperties.getProtocolManagerFactoryStr();
if (val5 != null) {
cf.setProtocolManagerFactoryStr(val5);
}
}
public void setManagedConnectionFactory(ActiveMQRAManagedConnectionFactory activeMQRAManagedConnectionFactory) {

View File

@ -118,6 +118,8 @@ public class ConnectionFactoryProperties {
private String groupID;
private String protocolManagerFactoryStr;
/**
* @return the transportType
*/
@ -679,6 +681,14 @@ public class ConnectionFactoryProperties {
hasBeenUpdated = true;
}
public String getProtocolManagerFactoryStr() {
return protocolManagerFactoryStr;
}
public void setProtocolManagerFactoryStr(String protocolManagerFactoryStr) {
this.protocolManagerFactoryStr = protocolManagerFactoryStr;
}
public boolean isHasBeenUpdated() {
return hasBeenUpdated;
}
@ -890,6 +900,12 @@ public class ConnectionFactoryProperties {
}
else if (!this.producerWindowSize.equals(other.producerWindowSize))
return false;
else if (!protocolManagerFactoryStr.equals(other.protocolManagerFactoryStr))
return false;
if (this.protocolManagerFactoryStr == null) {
if (other.protocolManagerFactoryStr != null)
return false;
}
if (this.reconnectAttempts == null) {
if (other.reconnectAttempts != null)
return false;
@ -971,6 +987,7 @@ public class ConnectionFactoryProperties {
result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode());
result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode());
result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode());
result = prime * result + ((protocolManagerFactoryStr == null) ? 0 : protocolManagerFactoryStr.hashCode());
result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode());
result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode());
result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode());

View File

@ -257,6 +257,15 @@ public interface Configuration {
Configuration addAcceptorConfiguration(final TransportConfiguration infos);
/**
* Add an acceptor to the config
* @param name the name of the acceptor
* @param uri the URI of the acceptor
* @return this
* @throws Exception in case of Parsing errors on the URI
*/
Configuration addAcceptorConfiguration(String name, String uri) throws Exception;
Configuration clearAcceptorConfigurations();
/**
@ -271,6 +280,8 @@ public interface Configuration {
Configuration addConnectorConfiguration(final String key, final TransportConfiguration info);
Configuration addConnectorConfiguration(final String name, final String uri) throws Exception;
Configuration clearConnectorConfigurations();
/**

View File

@ -51,6 +51,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
public class ConfigurationImpl implements Configuration, Serializable {
@ -337,6 +339,19 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception {
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
for (TransportConfiguration config : configurations) {
addAcceptorConfiguration(config);
}
return this;
}
public ConfigurationImpl clearAcceptorConfigurations() {
acceptorConfigs.clear();
return this;
@ -356,6 +371,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
for (TransportConfiguration config : configurations) {
addConnectorConfiguration(name, config);
}
return this;
}
public ConfigurationImpl clearConnectorConfigurations() {
connectorConfigs.clear();
return this;

View File

@ -154,7 +154,15 @@ public class ProtocolHandler {
//if we get here we assume we use the core protocol as we match nothing else
if (protocolToUse == null) {
protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
for (Map.Entry<String, ProtocolManager> entry : protocolMap.entrySet()) {
if (entry.getValue().acceptsNoHandshake()) {
protocolToUse = entry.getKey();
break;
}
}
if (protocolToUse == null) {
protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL;
}
}
ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse);
ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator();

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -25,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
@ -98,6 +98,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
@Override
public boolean acceptsNoHandshake() {
return false;
}
/**
* no need to implement this now
*
@ -162,23 +167,25 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
@Override
public boolean isProtocol(byte[] array) {
String frameStart = new String(array, StandardCharsets.US_ASCII);
return frameStart.startsWith("ACTIVEMQ");
return isArtemis(ActiveMQBuffers.wrappedBuffer(array));
}
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
//if we are not an old client then handshake
if (buffer.getByte(0) == 'A' &&
if (isArtemis(buffer)) {
buffer.readBytes(7);
}
}
private boolean isArtemis(ActiveMQBuffer buffer) {
return buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' &&
buffer.getByte(2) == 'T' &&
buffer.getByte(3) == 'E' &&
buffer.getByte(4) == 'M' &&
buffer.getByte(5) == 'I' &&
buffer.getByte(6) == 'S') {
//todo add some handshaking
buffer.readBytes(7);
}
buffer.getByte(6) == 'S';
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@ -27,10 +28,23 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactor
*/
public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolManagerFactory {
private static final ActiveMQServerSideProtocolManagerFactory INSTANCE = new ActiveMQServerSideProtocolManagerFactory();
public static ActiveMQServerSideProtocolManagerFactory getInstance() {
return INSTANCE;
ServerLocator locator;
@Override
public ServerLocator getLocator() {
return locator;
}
@Override
public void setLocator(ServerLocator locator) {
this.locator = locator;
}
public static ActiveMQServerSideProtocolManagerFactory getInstance(ServerLocator locator) {
ActiveMQServerSideProtocolManagerFactory instance = new ActiveMQServerSideProtocolManagerFactory();
instance.setLocator(locator);
return instance;
}
private ActiveMQServerSideProtocolManagerFactory() {

View File

@ -207,7 +207,7 @@ public class BackupManager implements ActiveMQComponent {
backupServerLocator.setIdentity("backupLocatorFor='" + server + "'");
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator));
}
}
@ -332,7 +332,7 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
return locator;
}
return null;

View File

@ -182,7 +182,7 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setReconnectAttempts(-1);
serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
locators.put(name, serverLocator);
}
@ -237,7 +237,7 @@ public class ClusterController implements ActiveMQComponent {
* @return the Cluster Control
*/
public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) {
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator()));
return new ClusterControl(sf, server);
}

View File

@ -129,7 +129,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
@Override
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
setSessionFactory(factory);

View File

@ -595,7 +595,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.start(server.getExecutorFactory().getExecutor());
}
@ -760,7 +760,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.setAfterConnectionInternalListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
targetLocator.setNodeID(nodeId);

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -31,15 +34,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import java.util.List;
import java.util.Map;
/*
* Instead of loading into its own post office this will use its parent server (the actual live server) and load into that.
* Since the server is already running we have to make sure we don't route any message that may subsequently get deleted or acked.
@ -88,7 +88,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());

View File

@ -115,7 +115,7 @@ public class LiveOnlyActivation extends Activation {
try {
scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
//use a Node Locator to connect to the cluster
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator));
LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), activeMQServer);
scaleDownServerLocator.addClusterTopologyListener(nodeLocator);

View File

@ -55,5 +55,10 @@ public interface ProtocolManager<P extends BaseInterceptor> {
*/
MessageConverter getConverter();
/** If this protocols accepts connectoins without an initial handshake.
* If true this protocol will be the failback case no other conenctions are made.
* New designed protocols should always require a handshake. This is only useful for legacy protocols. */
boolean acceptsNoHandshake();
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
}

View File

@ -24,11 +24,15 @@ import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
@ -206,6 +210,18 @@ public abstract class ActiveMQTestBase extends Assert {
temporaryFolder = new TemporaryFolder(parent);
}
protected <T> T serialClone(Object object) throws Exception {
System.out.println("object::" + object);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream obOut = new ObjectOutputStream(bout);
obOut.writeObject(object);
ByteArrayInputStream binput = new ByteArrayInputStream(bout.toByteArray());
ObjectInputStream obinp = new ObjectInputStream(binput);
return (T) obinp.readObject();
}
@After
public void tearDown() throws Exception {
for (ExecutorService s : executorSet) {

View File

@ -75,7 +75,7 @@ public class ActiveMQXAResourceRecovery {
String username = parser.getUsername();
String password = parser.getPassword();
TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null);
xaRecoveryConfigs[i] = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfiguration}, username, password, null, null);
}
res = new ActiveMQXAResourceWrapper(xaRecoveryConfigs);

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
/**
* This represents the configuration of a single connection factory.
@ -42,13 +43,14 @@ public class XARecoveryConfig {
private final String username;
private final String password;
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;
public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map<String, String> properties) {
if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties);
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}
else {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties);
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}
}
@ -57,13 +59,44 @@ public class XARecoveryConfig {
final TransportConfiguration[] transportConfiguration,
final String username,
final String password,
final Map<String, String> properties) {
this.transportConfiguration = transportConfiguration;
final Map<String, String> properties,
final ClientProtocolManagerFactory clientProtocolManager) {
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
this.transportConfiguration = newTransportConfiguration;
this.discoveryConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;
}
public XARecoveryConfig(final boolean ha,
final TransportConfiguration[] transportConfiguration,
final String username,
final String password,
final Map<String, String> properties) {
this(ha, transportConfiguration, username, password, properties, null);
}
public XARecoveryConfig(final boolean ha,
final DiscoveryGroupConfiguration discoveryConfiguration,
final String username,
final String password,
final Map<String, String> properties,
final ClientProtocolManagerFactory clientProtocolManager) {
this.discoveryConfiguration = discoveryConfiguration;
this.transportConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.clientProtocolManager = clientProtocolManager;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
}
public XARecoveryConfig(final boolean ha,
@ -71,12 +104,7 @@ public class XARecoveryConfig {
final String username,
final String password,
final Map<String, String> properties) {
this.discoveryConfiguration = discoveryConfiguration;
this.transportConfiguration = null;
this.username = username;
this.password = password;
this.ha = ha;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this(ha, discoveryConfiguration, username, password, properties, null);
}
public boolean isHA() {
@ -103,6 +131,10 @@ public class XARecoveryConfig {
return properties;
}
public ClientProtocolManagerFactory getClientProtocolManager() {
return clientProtocolManager;
}
/**
* Create a serverLocator using the configuration
*
@ -110,10 +142,10 @@ public class XARecoveryConfig {
*/
public ServerLocator createServerLocator() {
if (getDiscoveryConfiguration() != null) {
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration());
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
}
else {
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig());
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.service.extensions.tests.recovery;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.junit.Assert;
import org.junit.Test;
public class XARecoveryConfigTest {
@Test
public void testEquals() throws Exception {
String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
null, null, null);
TransportConfiguration transportConfig2 = new TransportConfiguration(factClass, null);
XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
null, null, null);
// They are using Different names
Assert.assertNotEquals(transportConfig, transportConfig2);
Assert.assertEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
// The equals here shouldn't take the name into consideration
Assert.assertEquals(config, config2);
}
@Test
public void testNotEquals() throws Exception {
String factClass = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory";
TransportConfiguration transportConfig = new TransportConfiguration(factClass, null);
XARecoveryConfig config = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig},
null, null, null);
TransportConfiguration transportConfig2 = new TransportConfiguration(factClass + "2", null);
XARecoveryConfig config2 = new XARecoveryConfig(false, new TransportConfiguration[]{transportConfig2},
null, null, null);
// They are using Different names
Assert.assertNotEquals(transportConfig, transportConfig2);
Assert.assertNotEquals(transportConfig.newTransportConfig(""), transportConfig2.newTransportConfig(""));
// The equals here shouldn't take the name into consideration
Assert.assertNotEquals(config, config2);
}
}

View File

@ -0,0 +1,110 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>jms-examples</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>interceptor-client</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Interceptor Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<libList><arg>org.apache.activemq.examples.broker:interceptor:${project.version}</arg></libList>
<ignore>${noServer}</ignore>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
</configuration>
</execution>
<execution>
<id>start</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.InterceptorExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>interceptor-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,72 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS Interceptor Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>JMS Interceptor Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example shows you how to implement and configure a simple incoming, server-side interceptor with ActiveMQ Artemis.</p>
<p>ActiveMQ Artemis allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the
Interceptor interface, as defined below: </p>
<pre class="prettyprint">
<code>
public interface Interceptor
{
boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException;
}
</code>
</pre>
<p>Once you have your own interceptor class, add it to the broker.xml, as follows:</p>
<pre class="prettyprint">
<code>
&lt;configuration&gt;
...
&lt;remoting-incoming-interceptors&gt;
&lt;class-name&gt;org.apache.activemq.artemis.jms.example.SimpleInterceptor&lt;/class-name&gt;
&lt;/remoting-incoming-interceptors&gt;
...
&lt;/configuration&gt;
</code>
</pre>
<p>With interceptor, you can handle various events in message processing. In this example, a simple interceptor, SimpleInterceptor, is implemented and configured.
When the example is running, the interceptor will print out each events that are passed in the interceptor. And it will add a string property to the message being
delivered. You can see that after the message is received, there will be a new string property appears in the received message.</p>
<p>With our interceptor we always return <code>true</code> from the <code>intercept</code> method. If we were
to return <code>false</code> that signifies that no more interceptors are to run or the target
is not to be called. Return <code>false</code> to abort processing of the packet.</p>
</body>
</html>

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
/**
* A simple JMS example that shows how to implement and use interceptors with ActiveMQ Artemis.
*/
public class InterceptorExample {
public static void main(final String[] args) throws Exception {
Connection connection = null;
try {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?incomingInterceptorList=" + SimpleInterceptor.class.getName());
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
System.out.println("Sending message [" + message.getText() +
"] with String property: " +
message.getStringProperty("newproperty"));
producer.send(message);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
System.out.println("Received message [" + messageReceived.getText() +
"] with String property: " +
messageReceived.getStringProperty("newproperty"));
if (messageReceived.getStringProperty("newproperty") == null) {
throw new IllegalStateException("Check your configuration as the example interceptor wasn't actually called!");
}
}
finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
/**
* A simple Interceptor implementation
*/
public class SimpleInterceptor implements Interceptor {
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
System.out.println("SimpleInterceptor gets called!");
System.out.println("Packet: " + packet.getClass().getName());
System.out.println("RemotingConnection: " + connection.getRemoteAddress());
if (packet instanceof SessionReceiveMessage) {
SessionReceiveMessage realPacket = (SessionReceiveMessage) packet;
Message msg = realPacket.getMessage();
msg.putStringProperty(new SimpleString("newproperty"), new SimpleString("Hello from interceptor!"));
}
// We return true which means "call next interceptor" (if there is one) or target.
// If we returned false, it means "abort call" - no more interceptors would be called and neither would
// the target
return true;
}
}

View File

@ -54,6 +54,7 @@ under the License.
<module>expiry</module>
<module>http-transport</module>
<module>interceptor</module>
<module>interceptor-client</module>
<module>instantiate-connection-factory</module>
<module>jms-auto-closeable</module>
<module>jms-bridge</module>
@ -112,6 +113,7 @@ under the License.
<module>expiry</module>
<module>http-transport</module>
<module>interceptor</module>
<module>interceptor-client</module>
<module>jms-auto-closeable</module>
<module>instantiate-connection-factory</module>
<module>jms-bridge</module>

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.ra.recovery.RecoveryManager;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HornetQProtocolManagerTest extends ActiveMQTestBase {
ActiveMQServer server;
EmbeddedJMS embeddedJMS;
@Before
public void setUp() throws Exception {
super.setUp();
Configuration configuration = createDefaultConfig(false);
configuration.setPersistenceEnabled(false);
configuration.getAcceptorConfigurations().clear();
configuration.addAcceptorConfiguration("legacy", "tcp://localhost:61616?protocols=HORNETQ").
addAcceptorConfiguration("corepr", "tcp://localhost:61617?protocols=CORE");
configuration.addConnectorConfiguration("legacy", "tcp://localhost:61616");
JMSConfiguration jmsConfiguration = new JMSConfigurationImpl();
jmsConfiguration.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName("testQueue").setBindings("testQueue"));
embeddedJMS = new EmbeddedJMS();
embeddedJMS.setConfiguration(configuration);
embeddedJMS.setJmsConfiguration(jmsConfiguration);
embeddedJMS.start();
}
public void tearDown() throws Exception {
embeddedJMS.stop();
super.tearDown();
}
@Test
public void testLegacy() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=" + HornetQClientProtocolManagerFactory.class.getName());
connectionFactory.createConnection().close();
ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("tcp://localhost:61617");
connectionFactory2.createConnection().close();
RecoveryManager manager = new RecoveryManager();
manager.register(connectionFactory, null, null, new ConcurrentHashMap<String, String>());
manager.register(connectionFactory2, null, null, new ConcurrentHashMap<String, String>());
for (XARecoveryConfig resource :manager.getResources()) {
ServerLocator locator = resource.createServerLocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
factory.close();
locator.close();
}
}
/** This test will use an ArtemisConnectionFactory with clientProtocolManager=*/
@Test
public void testLegacy2() throws Exception {
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl();
configuration.setConnectorNames("legacy");
configuration.setName("legacy");
configuration.setProtocolManagerFactoryStr(HornetQClientProtocolManagerFactory.class.getName());
embeddedJMS.getJMSServerManager().createConnectionFactory(false, configuration, "legacy");
Queue queue = (Queue) embeddedJMS.lookup("testQueue");
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) embeddedJMS.lookup("legacy");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Test");
for (int i = 0; i < 5; i++) {
message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(), "duplicate");
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
TextMessage messageRec = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(messageRec);
Assert.assertEquals("Test", messageRec.getText());
Assert.assertNull(consumer.receiveNoWait());
connection.close();
connectionFactory.close();
}
}

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -29,15 +33,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.hornetq.api.core.client.HornetQClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class HornetQProtocolTest extends ActiveMQTestBase {
protected ActiveMQServer server;
@ -59,6 +61,13 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
waitForServerToStart(server);
}
@After
public void tearDown() throws Exception {
org.hornetq.core.client.impl.ServerLocatorImpl.clearThreadPools();
super.tearDown();
}
@Test
public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception {
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
@ -83,6 +92,7 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
}
ClientMessage coreMessage1 = coreConsumer.receive(1000);
System.err.println("Messages::==" + coreMessage1);
assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
coreSession.close();
@ -93,6 +103,39 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
hqSession.close();
}
@Test
public void testLargeMessagesOverHornetQClients() throws Exception {
org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
// Create Queue
String queueName = "test.hq.queue";
hqSession.createQueue(queueName, queueName, true);
// HornetQ Client Objects
hqSession.start();
org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
for (int i = 0; i < 2; i++) {
org.hornetq.api.core.client.ClientMessage hqMessage = hqSession.createMessage(true);
hqMessage.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
hqProducer.send(hqMessage);
}
hqSession.commit();
for (int i = 0; i < 2; i++) {
org.hornetq.api.core.client.ClientMessage coreMessage1 = hqConsumer.receive(1000);
coreMessage1.acknowledge();
System.err.println("Messages::==" + coreMessage1);
for (int j = 0; j < 10 * 1024; j++) {
Assert.assertEquals(ActiveMQTestBase.getSamplebyte(j), coreMessage1.getBodyBuffer().readByte());
}
}
hqSession.close();
}
@Test
public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception {
org.hornetq.api.core.client.ClientSession session = createHQClientSession();
@ -158,14 +201,14 @@ public class HornetQProtocolTest extends ActiveMQTestBase {
}
private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session) {
org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
org.hornetq.api.core.client.ClientMessage message = session.createMessage(true);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;
}
private ClientMessage createCoreTestMessage(ClientSession session) {
ClientMessage message = session.createMessage(false);
ClientMessage message = session.createMessage(true);
String v = UUID.randomUUID().toString();
message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
return message;

View File

@ -55,7 +55,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentConnector() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
ClusterController controller = new ClusterController(getServer(0), getServer(0).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
clusterControl.authorize();
@ -65,7 +65,7 @@ public class ClusterControllerTest extends ClusterTestBase {
@Test
public void controlWithDifferentPassword() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
ClusterController controller = new ClusterController(getServer(1), getServer(1).getScheduledPool());
ClusterControl clusterControl = controller.connectToNodeInCluster((ClientSessionFactoryInternal) locator.createSessionFactory());
try {

View File

@ -1329,7 +1329,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
}
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
addServerLocator(locators[node]);

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.interceptors;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class Incoming implements Interceptor {
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
System.out.println("Incoming:Packet : " + packet);
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG) {
SessionReceiveMessage p = (SessionReceiveMessage) packet;
p.getMessage().putStringProperty("Incoming", "was here");
}
return true;
}
}

View File

@ -14,7 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration;
package org.apache.activemq.artemis.tests.integration.interceptors;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -40,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -1027,4 +1038,52 @@ public class InterceptorTest extends ActiveMQTestBase {
session.close();
}
@Test
public void testInterceptorOnURI() throws Exception {
locator.close();
String uri = "tcp://localhost:61616?incomingInterceptorList=" + Incoming.class.getCanonicalName() + "&outgoingInterceptorList=" + Outgoing.class.getName();
System.out.println(uri);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
// Serialize stuff to make sure the interceptors are on the URI
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.close();
objectOutputStream.writeObject(factory);
ByteArrayInputStream input = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
ObjectInputStream objectInputStream = new ObjectInputStream(input);
factory = (ActiveMQConnectionFactory) objectInputStream.readObject();
Connection connection = factory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(QUEUE.toString()));
producer.send(session.createTextMessage("HelloMessage"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE.toString()));
TextMessage msg = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(consumer);
Assert.assertEquals("HelloMessage", msg.getText());
Assert.assertEquals("was here", msg.getStringProperty("Incoming"));
Assert.assertEquals("sending", msg.getStringProperty("Outgoing"));
connection.close();
factory.close();
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.interceptors;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class Outgoing implements Interceptor {
public boolean intercept(final Packet packet, final RemotingConnection connection) throws ActiveMQException {
System.out.println("Outgoin:Packet : " + packet);
if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage p = (SessionSendMessage) packet;
p.getMessage().putStringProperty("Outgoing", "sending");
}
return true;
}
}

View File

@ -44,19 +44,32 @@ public class ConnectionTest extends JMSTestBase {
@Test
public void testThroughNewConnectionFactory() throws Exception {
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("vm://0"));
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#"));
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
testThroughNewConnectionFactory(connectionFactory);
// Run it again with a cloned through serialization CF, simulating JNDI lookups
connectionFactory = serialClone(connectionFactory);
testThroughNewConnectionFactory(connectionFactory);
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#");
testThroughNewConnectionFactory(connectionFactory);
// Run it again with a cloned through serialization CF, simulating JNDI lookups
connectionFactory = serialClone(connectionFactory);
testThroughNewConnectionFactory(connectionFactory);
}
private void testThroughNewConnectionFactory(ActiveMQConnectionFactory factory) throws Exception {

View File

@ -384,6 +384,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
" <config-property-name>JgroupsChannelRefName</config-property-name>" +
" <config-property-type>java.lang.String</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>" +
" <config-property>" +
" <description>ProtocolManagerConfig</description>" +
" <config-property-name>ProtocolManagerFactoryStr</config-property-name>" +
" <config-property-type>java.lang.String</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>";
private static String rootConfig = "<root>" + config + commentedOutConfigs + "</root>";

View File

@ -41,6 +41,8 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase {
static {
UNSUPPORTED_CF_PROPERTIES = new TreeSet<String>();
UNSUPPORTED_CF_PROPERTIES.add("discoveryGroupName");
UNSUPPORTED_CF_PROPERTIES.add("incomingInterceptorList");
UNSUPPORTED_CF_PROPERTIES.add("outgoingInterceptorList");
UNSUPPORTED_RA_PROPERTIES = new TreeSet<String>();
UNSUPPORTED_RA_PROPERTIES.add("HA");