diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java new file mode 100644 index 0000000000..c530ab783c --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -0,0 +1,995 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.RejectedExecutionHandler; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.Context; + +import org.apache.activemq.blob.BlobTransferPolicy; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.jndi.JNDIBaseStorable; +import org.apache.activemq.management.JMSStatsImpl; +import org.apache.activemq.management.StatsCapable; +import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.util.URISupport.CompositeData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class); + private static final String DEFAULT_BROKER_HOST; + private static final int DEFAULT_BROKER_PORT; + private static URI defaultTcpUri; + static{ + String host = null; + String port = null; + try { + host = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public String run() { + String result = System.getProperty("org.apache.activemq.AMQ_HOST"); + result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result; + return result; + } + }); + port = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public String run() { + String result = System.getProperty("org.apache.activemq.AMQ_PORT"); + result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result; + return result; + } + }); + }catch(Throwable e){ + LOG.debug("Failed to look up System properties for host and port",e); + } + host = (host == null || host.isEmpty()) ? "localhost" : host; + port = (port == null || port.isEmpty()) ? "61616" : port; + DEFAULT_BROKER_HOST = host; + DEFAULT_BROKER_PORT = Integer.parseInt(port); + } + + + public static final String DEFAULT_BROKER_BIND_URL; + + static{ + final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; + String bindURL = null; + + try { + bindURL = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public String run() { + String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL"); + result = (result==null||result.isEmpty()) ? System.getProperty("BROKER_BIND_URL",defaultURL) : result; + return result; + } + }); + }catch(Throwable e){ + LOG.debug("Failed to look up System properties for host and port",e); + } + bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; + DEFAULT_BROKER_BIND_URL = bindURL; + try { + defaultTcpUri = new URI(defaultURL); + } catch (URISyntaxException e) { + LOG.debug("Failed to build default tcp url",e); + } + + } + + public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; + public static final String DEFAULT_USER = null; + public static final String DEFAULT_PASSWORD = null; + public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; + + protected URI brokerURL; + protected URI vmBrokerUri; + protected String userName; + protected String password; + protected String clientID; + protected boolean dispatchAsync=true; + protected boolean alwaysSessionAsync=true; + + JMSStatsImpl factoryStats = new JMSStatsImpl(); + + private IdGenerator clientIdGenerator; + private String clientIDPrefix; + private IdGenerator connectionIdGenerator; + private String connectionIDPrefix; + + // client policies + private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + { + redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); + } + private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); + private MessageTransformer transformer; + + private boolean disableTimeStampsByDefault; + private boolean optimizedMessageDispatch = true; + private long optimizeAcknowledgeTimeOut = 300; + private long optimizedAckScheduledAckInterval = 0; + private boolean copyMessageOnSend = true; + private boolean useCompression; + private boolean objectMessageSerializationDefered; + private boolean useAsyncSend; + private boolean optimizeAcknowledge; + private int closeTimeout = 15000; + private boolean useRetroactiveConsumer; + private boolean exclusiveConsumer; + private boolean nestedMapAndListEnabled = true; + private boolean alwaysSyncSend; + private boolean watchTopicAdvisories = true; + private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; + private long warnAboutUnstartedConnectionTimeout = 500L; + private int sendTimeout = 0; + private boolean sendAcksAsync=true; + private TransportListener transportListener; + private ExceptionListener exceptionListener; + private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; + private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + private boolean useDedicatedTaskRunner; + private long consumerFailoverRedeliveryWaitPeriod = 0; + private boolean checkForDuplicates = true; + private ClientInternalExceptionListener clientInternalExceptionListener; + private boolean messagePrioritySupported = false; + private boolean transactedIndividualAck = false; + private boolean nonBlockingRedelivery = false; + private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; + private TaskRunnerFactory sessionTaskRunner; + private RejectedExecutionHandler rejectedTaskHandler = null; + protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class + private boolean rmIdFromConnectionId = false; + private boolean consumerExpiryCheckEnabled = true; + + // ///////////////////////////////////////////// + // + // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods + // + // ///////////////////////////////////////////// + + public ActiveMQConnectionFactory() { + this(DEFAULT_BROKER_URL); + } + + public ActiveMQConnectionFactory(String brokerURL) { + this(createURI(brokerURL)); + try + { + URI uri = new URI(brokerURL); + String scheme = uri.getScheme(); + if ("vm".equals(scheme)) { + Map params = URISupport.parseParameters(uri); + params.clear(); + + this.vmBrokerUri = URISupport.createRemainingURI(uri, params);; + } + } + catch (URISyntaxException e) + { + } + + } + + public ActiveMQConnectionFactory(URI brokerURL) { + setBrokerURL(brokerURL.toString()); + } + + public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { + setUserName(userName); + setPassword(password); + setBrokerURL(brokerURL.toString()); + } + + public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { + setUserName(userName); + setPassword(password); + setBrokerURL(brokerURL); + } + + public ActiveMQConnectionFactory copy() { + try { + return (ActiveMQConnectionFactory)super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("This should never happen: " + e, e); + } + } + + private static URI createURI(String brokerURL) { + try { + URI uri = new URI(brokerURL); + String scheme = uri.getScheme(); + if ("vm".equals(scheme)) { + Map params = URISupport.parseParameters(uri); + params.put("invmBrokerId", uri.getHost() == null ? "localhost" : uri.getHost()); + defaultTcpUri = URISupport.createRemainingURI(defaultTcpUri, params); + return defaultTcpUri; + } + return uri; + } catch (URISyntaxException e) { + throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); + } + } + + @Override + public Connection createConnection() throws JMSException { + return createActiveMQConnection(); + } + + @Override + public Connection createConnection(String userName, String password) throws JMSException { + return createActiveMQConnection(userName, password); + } + + @Override + public QueueConnection createQueueConnection() throws JMSException { + return createActiveMQConnection().enforceQueueOnlyConnection(); + } + + @Override + public QueueConnection createQueueConnection(String userName, String password) throws JMSException { + return createActiveMQConnection(userName, password).enforceQueueOnlyConnection(); + } + + @Override + public TopicConnection createTopicConnection() throws JMSException { + return createActiveMQConnection(); + } + + @Override + public TopicConnection createTopicConnection(String userName, String password) throws JMSException { + return createActiveMQConnection(userName, password); + } + + @Override + public StatsImpl getStats() { + return this.factoryStats; + } + + // ///////////////////////////////////////////// + // + // Implementation methods. + // + // ///////////////////////////////////////////// + + protected ActiveMQConnection createActiveMQConnection() throws JMSException { + return createActiveMQConnection(userName, password); + } + + protected Transport createTransport() throws JMSException { + try { + System.out.println("xxxxxcreating conn: " + brokerURL.toString()); + Transport t = TransportFactory.connect(brokerURL); + System.out.println("xxxxxxxxxxxx created transport" + t); + return t; + } catch (Exception e) { + throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); + } + } + + protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { + if (brokerURL == null) { + throw new ConfigurationException("brokerURL not set."); + } + ActiveMQConnection connection = null; + try { + Transport transport = createTransport(); + connection = createActiveMQConnection(transport, factoryStats); + + connection.setUserName(userName); + connection.setPassword(password); + + configureConnection(connection); + + transport.start(); + + if (clientID != null) { + connection.setDefaultClientID(clientID); + } + + return connection; + } catch (JMSException e) { + // Clean up! + try { + connection.close(); + } catch (Throwable ignore) { + } + throw e; + } catch (Exception e) { + // Clean up! + try { + connection.close(); + } catch (Throwable ignore) { + } + throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); + } + } + + protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { + ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), + getConnectionIdGenerator(), stats); + return connection; + } + + protected void configureConnection(ActiveMQConnection connection) throws JMSException { + connection.setPrefetchPolicy(getPrefetchPolicy()); + connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); + connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); + connection.setCopyMessageOnSend(isCopyMessageOnSend()); + connection.setUseCompression(isUseCompression()); + connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); + connection.setDispatchAsync(isDispatchAsync()); + connection.setUseAsyncSend(isUseAsyncSend()); + connection.setAlwaysSyncSend(isAlwaysSyncSend()); + connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); + connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); + connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); + connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval()); + connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); + connection.setExclusiveConsumer(isExclusiveConsumer()); + connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); + connection.setTransformer(getTransformer()); + connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); + connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); + connection.setProducerWindowSize(getProducerWindowSize()); + connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); + connection.setSendTimeout(getSendTimeout()); + connection.setCloseTimeout(getCloseTimeout()); + connection.setSendAcksAsync(isSendAcksAsync()); + connection.setAuditDepth(getAuditDepth()); + connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); + connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); + connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); + connection.setCheckForDuplicates(isCheckForDuplicates()); + connection.setMessagePrioritySupported(isMessagePrioritySupported()); + connection.setTransactedIndividualAck(isTransactedIndividualAck()); + connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); + connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); + connection.setSessionTaskRunner(getSessionTaskRunner()); + connection.setRejectedTaskHandler(getRejectedTaskHandler()); + connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); + connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); + connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); + if (transportListener != null) { + connection.addTransportListener(transportListener); + } + if (exceptionListener != null) { + connection.setExceptionListener(exceptionListener); + } + if (clientInternalExceptionListener != null) { + connection.setClientInternalExceptionListener(clientInternalExceptionListener); + } + } + + // ///////////////////////////////////////////// + // + // Property Accessors + // + // ///////////////////////////////////////////// + + public String getBrokerURL() { + System.out.println("vm uri: " + vmBrokerUri); + if (vmBrokerUri != null) return vmBrokerUri.toString(); + return brokerURL == null ? null : brokerURL.toString(); + } + + public void setBrokerURL(String brokerURL) { + URI uri = null; + try + { + uri = new URI(brokerURL); + String scheme = uri.getScheme(); + if ("vm".equals(scheme)) { + this.vmBrokerUri = uri; + } + } + catch (URISyntaxException e) + { + } + this.brokerURL = createURI(brokerURL); + + // Use all the properties prefixed with 'jms.' to set the connection + // factory + // options. + if (this.brokerURL.getQuery() != null) { + // It might be a standard URI or... + try { + + Map map = URISupport.parseQuery(this.brokerURL.getQuery()); + Map jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); + if (buildFromMap(jmsOptionsMap)) { + if (!jmsOptionsMap.isEmpty()) { + String msg = "There are " + jmsOptionsMap.size() + + " jms options that couldn't be set on the ConnectionFactory." + + " Check the options are spelled correctly." + + " Unknown parameters=[" + jmsOptionsMap + "]." + + " This connection factory cannot be started."; + throw new IllegalArgumentException(msg); + } + + this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); + } + + } catch (URISyntaxException e) { + } + + } else { + + // It might be a composite URI. + try { + CompositeData data = URISupport.parseComposite(this.brokerURL); + Map jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); + if (buildFromMap(jmsOptionsMap)) { + if (!jmsOptionsMap.isEmpty()) { + String msg = "There are " + jmsOptionsMap.size() + + " jms options that couldn't be set on the ConnectionFactory." + + " Check the options are spelled correctly." + + " Unknown parameters=[" + jmsOptionsMap + "]." + + " This connection factory cannot be started."; + throw new IllegalArgumentException(msg); + } + + this.brokerURL = data.toURI(); + } + } catch (URISyntaxException e) { + } + } + } + + public String getClientID() { + return clientID; + } + + public void setClientID(String clientID) { + this.clientID = clientID; + } + + public boolean isCopyMessageOnSend() { + return copyMessageOnSend; + } + + public void setCopyMessageOnSend(boolean copyMessageOnSend) { + this.copyMessageOnSend = copyMessageOnSend; + } + + public boolean isDisableTimeStampsByDefault() { + return disableTimeStampsByDefault; + } + + public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { + this.disableTimeStampsByDefault = disableTimeStampsByDefault; + } + + public boolean isOptimizedMessageDispatch() { + return optimizedMessageDispatch; + } + + public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { + this.optimizedMessageDispatch = optimizedMessageDispatch; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public ActiveMQPrefetchPolicy getPrefetchPolicy() { + return prefetchPolicy; + } + + public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { + this.prefetchPolicy = prefetchPolicy; + } + + public boolean isUseAsyncSend() { + return useAsyncSend; + } + + public BlobTransferPolicy getBlobTransferPolicy() { + return blobTransferPolicy; + } + + public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { + this.blobTransferPolicy = blobTransferPolicy; + } + + public void setUseAsyncSend(boolean useAsyncSend) { + this.useAsyncSend = useAsyncSend; + } + + public synchronized boolean isWatchTopicAdvisories() { + return watchTopicAdvisories; + } + + public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { + this.watchTopicAdvisories = watchTopicAdvisories; + } + + public boolean isAlwaysSyncSend() { + return this.alwaysSyncSend; + } + + public void setAlwaysSyncSend(boolean alwaysSyncSend) { + this.alwaysSyncSend = alwaysSyncSend; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public boolean isUseRetroactiveConsumer() { + return useRetroactiveConsumer; + } + + public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { + this.useRetroactiveConsumer = useRetroactiveConsumer; + } + + public boolean isExclusiveConsumer() { + return exclusiveConsumer; + } + + public void setExclusiveConsumer(boolean exclusiveConsumer) { + this.exclusiveConsumer = exclusiveConsumer; + } + + public RedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicyMap.getDefaultEntry(); + } + + public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); + } + + public RedeliveryPolicyMap getRedeliveryPolicyMap() { + return this.redeliveryPolicyMap; + } + + public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { + this.redeliveryPolicyMap = redeliveryPolicyMap; + } + + public MessageTransformer getTransformer() { + return transformer; + } + + public int getSendTimeout() { + return sendTimeout; + } + + public void setSendTimeout(int sendTimeout) { + this.sendTimeout = sendTimeout; + } + + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } + + public boolean isMessagePrioritySupported() { + return this.messagePrioritySupported; + } + + public void setMessagePrioritySupported(boolean messagePrioritySupported) { + this.messagePrioritySupported = messagePrioritySupported; + } + + + public void setTransformer(MessageTransformer transformer) { + this.transformer = transformer; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void buildFromProperties(Properties properties) { + + if (properties == null) { + properties = new Properties(); + } + + String temp = properties.getProperty(Context.PROVIDER_URL); + if (temp == null || temp.length() == 0) { + temp = properties.getProperty("brokerURL"); + } + if (temp != null && temp.length() > 0) { + setBrokerURL(temp); + } + + Map p = new HashMap(properties); + buildFromMap(p); + } + + public boolean buildFromMap(Map properties) { + boolean rc = false; + + ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); + if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { + setPrefetchPolicy(p); + rc = true; + } + + RedeliveryPolicy rp = new RedeliveryPolicy(); + if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { + setRedeliveryPolicy(rp); + rc = true; + } + + BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); + if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { + setBlobTransferPolicy(blobTransferPolicy); + rc = true; + } + + rc |= IntrospectionSupport.setProperties(this, properties); + + return rc; + } + + @Override + public void populateProperties(Properties props) { + props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); + + if (getBrokerURL() != null) { + props.setProperty(Context.PROVIDER_URL, getBrokerURL()); + props.setProperty("brokerURL", getBrokerURL()); + } + + if (getClientID() != null) { + props.setProperty("clientID", getClientID()); + } + + IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); + IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); + IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); + + props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); + props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); + props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); + props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); + + if (getPassword() != null) { + props.setProperty("password", getPassword()); + } + + props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); + props.setProperty("useCompression", Boolean.toString(isUseCompression())); + props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); + props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); + + if (getUserName() != null) { + props.setProperty("userName", getUserName()); + } + + props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); + props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); + props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); + props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); + props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); + props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); + props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); + props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); + props.setProperty("auditDepth", Integer.toString(getAuditDepth())); + props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); + props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); + props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); + props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); + props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); + props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); + props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); + props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); + props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); + props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); + } + + public boolean isUseCompression() { + return useCompression; + } + + public void setUseCompression(boolean useCompression) { + this.useCompression = useCompression; + } + + public boolean isObjectMessageSerializationDefered() { + return objectMessageSerializationDefered; + } + + public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { + this.objectMessageSerializationDefered = objectMessageSerializationDefered; + } + + public boolean isDispatchAsync() { + return dispatchAsync; + } + + public void setDispatchAsync(boolean asyncDispatch) { + this.dispatchAsync = asyncDispatch; + } + + public int getCloseTimeout() { + return closeTimeout; + } + + public void setCloseTimeout(int closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public boolean isAlwaysSessionAsync() { + return alwaysSessionAsync; + } + + public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { + this.alwaysSessionAsync = alwaysSessionAsync; + } + + public boolean isOptimizeAcknowledge() { + return optimizeAcknowledge; + } + + public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { + this.optimizeAcknowledge = optimizeAcknowledge; + } + + public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { + this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; + } + + public long getOptimizeAcknowledgeTimeOut() { + return optimizeAcknowledgeTimeOut; + } + + public boolean isNestedMapAndListEnabled() { + return nestedMapAndListEnabled; + } + + public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { + this.nestedMapAndListEnabled = structuredMapsEnabled; + } + + public String getClientIDPrefix() { + return clientIDPrefix; + } + + public void setClientIDPrefix(String clientIDPrefix) { + this.clientIDPrefix = clientIDPrefix; + } + + protected synchronized IdGenerator getClientIdGenerator() { + if (clientIdGenerator == null) { + if (clientIDPrefix != null) { + clientIdGenerator = new IdGenerator(clientIDPrefix); + } else { + clientIdGenerator = new IdGenerator(); + } + } + return clientIdGenerator; + } + + protected void setClientIdGenerator(IdGenerator clientIdGenerator) { + this.clientIdGenerator = clientIdGenerator; + } + + public void setConnectionIDPrefix(String connectionIDPrefix) { + this.connectionIDPrefix = connectionIDPrefix; + } + + protected synchronized IdGenerator getConnectionIdGenerator() { + if (connectionIdGenerator == null) { + if (connectionIDPrefix != null) { + connectionIdGenerator = new IdGenerator(connectionIDPrefix); + } else { + connectionIdGenerator = new IdGenerator(); + } + } + return connectionIdGenerator; + } + + protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { + this.connectionIdGenerator = connectionIdGenerator; + } + + public boolean isStatsEnabled() { + return this.factoryStats.isEnabled(); + } + + public void setStatsEnabled(boolean statsEnabled) { + this.factoryStats.setEnabled(statsEnabled); + } + + public synchronized int getProducerWindowSize() { + return producerWindowSize; + } + + public synchronized void setProducerWindowSize(int producerWindowSize) { + this.producerWindowSize = producerWindowSize; + } + + public long getWarnAboutUnstartedConnectionTimeout() { + return warnAboutUnstartedConnectionTimeout; + } + + public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { + this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; + } + + public TransportListener getTransportListener() { + return transportListener; + } + + public void setTransportListener(TransportListener transportListener) { + this.transportListener = transportListener; + } + + + public ExceptionListener getExceptionListener() { + return exceptionListener; + } + + public void setExceptionListener(ExceptionListener exceptionListener) { + this.exceptionListener = exceptionListener; + } + + public int getAuditDepth() { + return auditDepth; + } + + public void setAuditDepth(int auditDepth) { + this.auditDepth = auditDepth; + } + + public int getAuditMaximumProducerNumber() { + return auditMaximumProducerNumber; + } + + public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { + this.auditMaximumProducerNumber = auditMaximumProducerNumber; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { + this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; + } + + public long getConsumerFailoverRedeliveryWaitPeriod() { + return consumerFailoverRedeliveryWaitPeriod; + } + + public ClientInternalExceptionListener getClientInternalExceptionListener() { + return clientInternalExceptionListener; + } + + public void setClientInternalExceptionListener( + ClientInternalExceptionListener clientInternalExceptionListener) { + this.clientInternalExceptionListener = clientInternalExceptionListener; + } + + public boolean isCheckForDuplicates() { + return this.checkForDuplicates; + } + + public void setCheckForDuplicates(boolean checkForDuplicates) { + this.checkForDuplicates = checkForDuplicates; + } + + public boolean isTransactedIndividualAck() { + return transactedIndividualAck; + } + + public void setTransactedIndividualAck(boolean transactedIndividualAck) { + this.transactedIndividualAck = transactedIndividualAck; + } + + + public boolean isNonBlockingRedelivery() { + return nonBlockingRedelivery; + } + + public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { + this.nonBlockingRedelivery = nonBlockingRedelivery; + } + + public int getMaxThreadPoolSize() { + return maxThreadPoolSize; + } + + public void setMaxThreadPoolSize(int maxThreadPoolSize) { + this.maxThreadPoolSize = maxThreadPoolSize; + } + + public TaskRunnerFactory getSessionTaskRunner() { + return sessionTaskRunner; + } + + public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { + this.sessionTaskRunner = sessionTaskRunner; + } + + public RejectedExecutionHandler getRejectedTaskHandler() { + return rejectedTaskHandler; + } + + public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { + this.rejectedTaskHandler = rejectedTaskHandler; + } + + public long getOptimizedAckScheduledAckInterval() { + return optimizedAckScheduledAckInterval; + } + + public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { + this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; + } + + + public boolean isRmIdFromConnectionId() { + return rmIdFromConnectionId; + } + + public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { + this.rmIdFromConnectionId = rmIdFromConnectionId; + } + + public boolean isConsumerExpiryCheckEnabled() { + return consumerExpiryCheckEnabled; + } + + public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { + this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; + } +} diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java index 8d5cdabc38..d58688627b 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java @@ -21,6 +21,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URI; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; public class ArtemisBrokerHelper { @@ -72,5 +73,24 @@ public class ArtemisBrokerHelper { service = startedBroker; } + public static BrokerService getBroker() { + return (BrokerService)service; + } + + public static void stopArtemisBroker() throws Exception + { + try + { + if (service != null) + { + Method startMethod = serviceClass.getMethod("stop"); + startMethod.invoke(service, (Object[]) null); + } + } + finally + { + service = null; + } + } } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index 34dc6e4757..75eff1fcf1 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -194,11 +194,13 @@ public class BrokerService implements Service @Override public void stop() throws Exception { + System.out.println("broker is: " + broker); LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId}); if (broker != null) { + System.out.println("______________________stopping broker: " + broker.getClass().getName()); broker.stop(); broker = null; } @@ -566,7 +568,10 @@ public class BrokerService implements Service public TransportConnector addConnector(URI bindAddress) throws Exception { Integer port = bindAddress.getPort(); - this.extraConnectors.add(port); + if (port != 0) + { + this.extraConnectors.add(port); + } return null; } @@ -719,6 +724,10 @@ public class BrokerService implements Service return null; } + public String getDefaultUri() + { + return "tcp://localhost:61616"; + } } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java index 227ad1b10e..52f1f42e8c 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java @@ -507,7 +507,7 @@ public abstract class ArtemisBrokerBase implements Broker { throws Exception { if (netty) { return createDefaultConfig(new HashMap(), - INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY); + NETTY_ACCEPTOR_FACTORY); } else { return createDefaultConfig(new HashMap(), INVM_ACCEPTOR_FACTORY); diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index ced78570b5..811d6e5fec 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -58,7 +58,8 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { testDir = temporaryFolder.getRoot().getAbsolutePath(); clearDataRecreateServerDirs(); - server = createServer(realStore, false); + server = createServer(realStore, true); + server.getConfiguration().getAcceptorConfigurations().clear(); HashMap params = new HashMap(); params.put(TransportConstants.PORT_PROP_NAME, "61616"); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); @@ -66,8 +67,6 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase Configuration serverConfig = server.getConfiguration(); - Set acceptors0 = serverConfig.getAcceptorConfigurations(); - Map addressSettingsMap = serverConfig.getAddressesSettings(); //do policy translation @@ -170,18 +169,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase anySet.add(destRole); } + Set acceptors = serverConfig.getAcceptorConfigurations(); + Iterator iter = acceptors.iterator(); + while (iter.hasNext()) + { + System.out.println("acceptor =>: " + iter.next()); + } + jmsServer = new JMSServerManagerImpl(server); InVMNamingContext namingContext = new InVMNamingContext(); jmsServer.setRegistry(new JndiBindingRegistry(namingContext)); jmsServer.start(); - Set acceptors = serverConfig.getAcceptorConfigurations(); - Iterator iter = acceptors.iterator(); - - while (iter.hasNext()) - { - System.out.println("acceptor =>: " + iter.next()); - } server.start(); /* diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java new file mode 100644 index 0000000000..959dea4abe --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; + +import org.apache.activemq.TransportLoggerSupport; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.*; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TcpTransportFactory extends TransportFactory { + private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class); + + private static volatile String brokerService = null; + + //if a broker is started or stopped it should set this. + public static void setBrokerName(String name) { + brokerService = name; + } + + @Override + public Transport doConnect(URI location) throws Exception { + //here check broker, if no broker, we start one + Map params = URISupport.parseParameters(location); + String brokerId = params.remove("invmBrokerId"); + params.clear(); + location = URISupport.createRemainingURI(location, params); + if (brokerService == null) { + + ArtemisBrokerHelper.startArtemisBroker(location); + brokerService = location.toString(); + + if (brokerId != null) { + BrokerRegistry.getInstance().bind(brokerId, ArtemisBrokerHelper.getBroker()); + System.out.println("bound: " + brokerId); + } + } + return super.doConnect(location); + } + + public TransportServer doBind(final URI location) throws IOException { + try { + Map options = new HashMap(URISupport.parseParameters(location)); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + server.setWireFormatFactory(createWireFormatFactory(options)); + IntrospectionSupport.setProperties(server, options); + Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); + server.setTransportOption(transportOptions); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory); + } + + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + + TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class); + IntrospectionSupport.setProperties(tcpTransport, options); + + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); + tcpTransport.setSocketOptions(socketOptions); + + if (tcpTransport.isTrace()) { + try { + transport = TransportLoggerSupport.createTransportLogger(transport, tcpTransport.getLogWriterName(), tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort()); + } catch (Throwable e) { + LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e); + } + } + + boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); + if (useInactivityMonitor && isUseInactivityMonitor(transport)) { + transport = createInactivityMonitor(transport, format); + IntrospectionSupport.setProperties(transport, options); + } + + // Only need the WireFormatNegotiator if using openwire + if (format instanceof OpenWireFormat) { + transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); + } + + return super.compositeConfigure(transport, format, options); + } + + protected boolean isUseInactivityMonitor(Transport transport) { + return true; + } + + protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { + URI localLocation = null; + String path = location.getPath(); + // see if the path is a local URI location + if (path != null && path.length() > 0) { + int localPortIndex = path.indexOf(':'); + try { + Integer.parseInt(path.substring(localPortIndex + 1, path.length())); + String localString = location.getScheme() + ":/" + path; + localLocation = new URI(localString); + } catch (Exception e) { + LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage()); + if(LOG.isDebugEnabled()) { + LOG.debug("Failure detail", e); + } + } + } + SocketFactory socketFactory = createSocketFactory(); + return createTcpTransport(wf, socketFactory, location, localLocation); + } + + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new TcpTransport(wf, socketFactory, location, localLocation); + } + + protected ServerSocketFactory createServerSocketFactory() throws IOException { + return ServerSocketFactory.getDefault(); + } + + protected SocketFactory createSocketFactory() throws IOException { + return SocketFactory.getDefault(); + } + + protected Transport createInactivityMonitor(Transport transport, WireFormat format) { + return new InactivityMonitor(transport, format); + } + + public static void clearService() + { + brokerService = null; + } +} diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java index 353f1d3262..bb1776cfb8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java @@ -27,9 +27,11 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Session; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +67,11 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { broker.stop(); } catch (Throwable ignore) { } + try { + ArtemisBrokerHelper.stopArtemisBroker(); + } catch (Throwable ignore) { + } + TcpTransportFactory.clearService(); } public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { @@ -120,6 +127,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { } public void testGetBrokerName() throws URISyntaxException, JMSException { + System.out.println("------------------beging testing..............."); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); connection = (ActiveMQConnection)cf.createConnection(); connection.start(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java index 77f422e159..c3fd2d0f6d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java @@ -24,8 +24,10 @@ import javax.jms.Session; import junit.framework.TestCase; +import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +55,8 @@ public class ActiveMQInputStreamTest extends TestCase { broker.start(); broker.waitUntilStarted(); - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + //some internal api we don't implement + connectionUri = broker.getDefaultUri(); } @Override