From 6f2ac637cd631c676903149ba0247b71a9a510cb Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 20 Sep 2012 10:48:56 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4066 Remove old deprecated code. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1387960 13f79535-47bb-0310-9956-ffa450edef68 --- .../filter/AmqMessagesQueryFilter.java | 26 +- .../activemq/broker/TransportConnector.java | 42 +-- .../apache/activemq/command/ConsumerInfo.java | 62 ++--- .../org/apache/activemq/thread/Valve.java | 122 --------- .../activemq/transport/TransportFactory.java | 10 +- .../transport/tcp/SslTransportFactory.java | 25 -- .../activemq/xbean/XBeanBrokerService.java | 34 +-- .../transport/http/HttpTransport.java | 240 ------------------ .../transport/https/HttpsTransport.java | 58 ----- .../xstream/XStreamMessageTransformer.java | 207 --------------- 10 files changed, 44 insertions(+), 782 deletions(-) delete mode 100755 activemq-core/src/main/java/org/apache/activemq/thread/Valve.java delete mode 100755 activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java delete mode 100755 activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransport.java delete mode 100644 activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java diff --git a/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java b/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java index beaad78ae4..4f6a8adbe4 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/filter/AmqMessagesQueryFilter.java @@ -18,7 +18,6 @@ package org.apache.activemq.console.filter; import java.net.URI; import java.util.Collections; -import java.util.Iterator; import java.util.List; import javax.jms.Connection; @@ -74,8 +73,8 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter { String selector = ""; // Convert to message selector - for (Iterator i = queries.iterator(); i.hasNext(); ) { - selector = selector + "(" + i.next().toString() + ") AND "; + for (Object query : queries) { + selector = selector + "(" + query.toString() + ") AND "; } // Remove last AND @@ -127,21 +126,6 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter { return null; } - /** - * Create and start a JMS connection - * - * @param brokerUrl - broker url to connect to. - * @return JMS connection - * @throws JMSException - * @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created. - */ - @Deprecated - protected Connection createConnection(URI brokerUrl) throws JMSException { - // maintain old behaviour, when called this way. - connectionFactory = (new ActiveMQConnectionFactory(brokerUrl)); - return createConnection(); - } - /** * Create and start a JMS connection * @@ -150,15 +134,14 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter { */ protected Connection createConnection() throws JMSException { // maintain old behaviour, when called either way. - if (null == connectionFactory) + if (null == connectionFactory) { connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl())); - + } Connection conn = connectionFactory.createConnection(); conn.start(); return conn; } - /** * Get the broker url being used. * @@ -194,5 +177,4 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter { public void setDestination(Destination destination) { this.destination = destination; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index aabfe40556..d3a542efdd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -19,13 +19,13 @@ package org.apache.activemq.broker; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.Iterator; import java.util.LinkedList; import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.regex.Pattern; import javax.management.ObjectName; + import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.region.ConnectorStatistics; @@ -46,13 +46,12 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean - * */ public class TransportConnector implements Connector, BrokerServiceAware { final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); - protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); + protected final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected TransportStatusDetector statusDector; private BrokerService brokerService; private TransportServer server; @@ -90,7 +89,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { setEnableStatusMonitor(false); } } - } /** @@ -104,8 +102,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { * Factory method to create a JMX managed version of this transport * connector */ - public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) - throws IOException, URISyntaxException { + public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); rc.setBrokerInfo(getBrokerInfo()); rc.setConnectUri(getConnectUri()); @@ -136,19 +133,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.brokerInfo = brokerInfo; } - /** - * - * @deprecated use the {@link #setBrokerService(BrokerService)} method - * instead. - */ - @Deprecated - public void setBrokerName(String name) { - if (this.brokerInfo == null) { - this.brokerInfo = new BrokerInfo(); - } - this.brokerInfo.setBrokerName(name); - } - public TransportServer getServer() throws IOException, URISyntaxException { if (server == null) { setServer(createTransportServer()); @@ -272,8 +256,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { String publishableConnectString = null; if (theConnectURI != null) { publishableConnectString = theConnectURI.toString(); - // strip off server side query parameters which may not be compatible to - // clients + // strip off server side query parameters which may not be compatible to clients if (theConnectURI.getRawQuery() != null) { publishableConnectString = publishableConnectString.substring(0, publishableConnectString .indexOf(theConnectURI.getRawQuery()) - 1); @@ -297,9 +280,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.statusDector.stop(); } - for (Iterator iter = connections.iterator(); iter.hasNext();) { - TransportConnection c = iter.next(); - ss.stop(c); + for (TransportConnection connection : connections) { + ss.stop(connection); } server = null; ss.throwFirstException(); @@ -341,8 +323,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { if (discoveryUri != null) { DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); - if( agent!=null && agent instanceof BrokerServiceAware ) { - ((BrokerServiceAware)agent).setBrokerService(brokerService); + if (agent != null && agent instanceof BrokerServiceAware) { + ((BrokerServiceAware) agent).setBrokerService(brokerService); } return agent; @@ -428,9 +410,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { control.setConnectedBrokers(connectedBrokers); control.setRebalanceConnection(rebalance); return control; - } - + public void addPeerBroker(BrokerInfo info) { if (isMatchesClusterFilter(info.getBrokerName())) { synchronized (peerBrokers) { @@ -438,7 +419,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { } } } - + public void removePeerBroker(BrokerInfo info) { synchronized (peerBrokers) { getPeerBrokers().remove(info.getBrokerURL()); @@ -455,7 +436,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { } public void updateClientClusterInfo() { - if (isRebalanceClusterClients() || isUpdateClusterClients()) { ConnectionControl control = getConnectionControl(); for (Connection c : this.connections) { @@ -480,6 +460,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { } } } + return result; } @@ -620,5 +601,4 @@ public class TransportConnector implements Connector, BrokerServiceAware { public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java index b5bdde9414..650a7f57d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -24,7 +24,7 @@ import org.apache.activemq.state.CommandVisitor; /** * @openwire:marshaller code="5" - * + * */ public class ConsumerInfo extends BaseCommand { @@ -117,7 +117,7 @@ public class ConsumerInfo extends BaseCommand { /** * Is used to uniquely identify the consumer to the broker. - * + * * @openwire:property version=1 cache=true */ public ConsumerId getConsumerId() { @@ -130,7 +130,7 @@ public class ConsumerInfo extends BaseCommand { /** * Is this consumer a queue browser? - * + * * @openwire:property version=1 */ public boolean isBrowser() { @@ -144,7 +144,7 @@ public class ConsumerInfo extends BaseCommand { /** * The destination that the consumer is interested in receiving messages * from. This destination could be a composite destination. - * + * * @openwire:property version=1 cache=true */ public ActiveMQDestination getDestination() { @@ -158,7 +158,7 @@ public class ConsumerInfo extends BaseCommand { /** * How many messages a broker will send to the client without receiving an * ack before he stops dispatching messages to the client. - * + * * @openwire:property version=1 */ public int getPrefetchSize() { @@ -173,7 +173,7 @@ public class ConsumerInfo extends BaseCommand { /** * How many messages a broker will keep around, above the prefetch limit, * for non-durable topics before starting to discard older messages. - * + * * @openwire:property version=1 */ public int getMaximumPendingMessageLimit() { @@ -190,7 +190,7 @@ public class ConsumerInfo extends BaseCommand { * done async, then he broker use a STP style of processing. STP is more * appropriate in high bandwidth situations or when being used by and in vm * transport. - * + * * @openwire:property version=1 */ public boolean isDispatchAsync() { @@ -204,7 +204,7 @@ public class ConsumerInfo extends BaseCommand { /** * The JMS selector used to filter out messages that this consumer is * interested in. - * + * * @openwire:property version=1 */ public String getSelector() { @@ -217,7 +217,7 @@ public class ConsumerInfo extends BaseCommand { /** * Used to identify the name of a durable subscription. - * + * * @openwire:property version=1 */ public String getSubscriptionName() { @@ -228,28 +228,10 @@ public class ConsumerInfo extends BaseCommand { this.subscriptionName = durableSubscriptionId; } - /** - * @deprecated - * @return - * @see getSubscriptionName - */ - public String getSubcriptionName() { - return subscriptionName; - } - - /** - * @deprecated - * @see setSubscriptionName - * @param durableSubscriptionId - */ - public void setSubcriptionName(String durableSubscriptionId) { - this.subscriptionName = durableSubscriptionId; - } - /** * Set noLocal to true to avoid receiving messages that were published * locally on the same connection. - * + * * @openwire:property version=1 */ public boolean isNoLocal() { @@ -265,7 +247,7 @@ public class ConsumerInfo extends BaseCommand { * receive messages from the destination. If there are multiple exclusive * consumers for a destination, the first one created will be the exclusive * consumer of the destination. - * + * * @openwire:property version=1 */ public boolean isExclusive() { @@ -283,7 +265,7 @@ public class ConsumerInfo extends BaseCommand { * published to the topic. If the consumer is durable then it will receive * all persistent messages that are still stored in persistent storage for * that topic. - * + * * @openwire:property version=1 */ public boolean isRetroactive() { @@ -305,7 +287,7 @@ public class ConsumerInfo extends BaseCommand { * are other higher priority consumers available to dispatch to. This allows * letting the broker to have an affinity to higher priority consumers. * Default priority is 0. - * + * * @openwire:property version=1 */ public byte getPriority() { @@ -318,7 +300,7 @@ public class ConsumerInfo extends BaseCommand { /** * The route of brokers the command has moved through. - * + * * @openwire:property version=1 cache=true */ public BrokerId[] getBrokerPath() { @@ -334,7 +316,7 @@ public class ConsumerInfo extends BaseCommand { * predicates into the selector on the fly. Handy if if say a Security * Broker interceptor wants to filter out messages based on security level * of the consumer. - * + * * @openwire:property version=1 */ public BooleanExpression getAdditionalPredicate() { @@ -396,7 +378,7 @@ public class ConsumerInfo extends BaseCommand { /** * The broker may be able to optimize it's processing or provides better QOS * if it knows the consumer will not be sending ranged acks. - * + * * @return true if the consumer will not send range acks. * @openwire:property version=1 */ @@ -423,11 +405,11 @@ public class ConsumerInfo extends BaseCommand { } } } - + public synchronized boolean isNetworkConsumersEmpty() { return networkConsumerIds == null || networkConsumerIds.isEmpty(); } - + public synchronized List getNetworkConsumerIds(){ List result = new ArrayList(); if (networkConsumerIds != null) { @@ -437,10 +419,10 @@ public class ConsumerInfo extends BaseCommand { } /** - * Tracks the original subscription id that causes a subscription to + * Tracks the original subscription id that causes a subscription to * percolate through a network when networkTTL > 1. Tracking the original * subscription allows duplicate suppression. - * + * * @return array of the current subscription path * @openwire:property version=4 */ @@ -451,7 +433,7 @@ public class ConsumerInfo extends BaseCommand { } return result; } - + public void setNetworkConsumerPath(ConsumerId[] consumerPath) { if (consumerPath != null) { for (int i=0; i 0 || !on) { - mutex.wait(); - } - on = false; - mutex.notifyAll(); - } finally { - --turningOff; - } - } - } - - /** - * Increments the use counter of the valve. This method blocks if the valve - * is off, or is being turned off. - * - * @throws InterruptedException if wait is interrupted - */ - public void increment() throws InterruptedException { - synchronized (mutex) { - if (turningOff < 0) { - throw new IllegalStateException("Unbalanced turningOff: " + turningOff); - } - if (usage < 0) { - throw new IllegalStateException("Unbalanced usage: " + usage); - } - // Do we have to wait for the value to be on? - while (turningOff > 0 || !on) { - mutex.wait(); - } - usage++; - } - } - - /** - * Decrements the use counter of the valve. - */ - public void decrement() { - synchronized (mutex) { - usage--; - if (turningOff < 0) { - throw new IllegalStateException("Unbalanced turningOff: " + turningOff); - } - if (usage < 0) { - throw new IllegalStateException("Unbalanced usage: " + usage); - } - if (turningOff > 0 && usage < 1) { - mutex.notifyAll(); - } - } - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java index 5b3c960262..b99ae14039 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java @@ -112,13 +112,6 @@ public abstract class TransportFactory { return tf.doBind(location); } - /** - * @deprecated - */ - public static TransportServer bind(String brokerId, URI location) throws IOException { - return bind(location); - } - public static TransportServer bind(BrokerService brokerService, URI location) throws IOException { TransportFactory tf = findTransportFactory(location); if( brokerService!=null && tf instanceof BrokerServiceAware ) { @@ -159,7 +152,6 @@ public abstract class TransportFactory { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; - } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } @@ -265,7 +257,7 @@ public abstract class TransportFactory { * @throws Exception */ @SuppressWarnings("rawtypes") - public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { + public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { if (options.containsKey(THREAD_NAME_FILTER)) { transport = new ThreadNameFilter(transport); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java index 90521898d3..c7d2fc8e00 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java @@ -20,16 +20,13 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import javax.net.ServerSocketFactory; import javax.net.SocketFactory; -import javax.net.ssl.KeyManager; import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; import org.apache.activemq.broker.SslContext; import org.apache.activemq.transport.Transport; @@ -46,13 +43,8 @@ import org.slf4j.LoggerFactory; * contribution from this class is that it is aware of SslTransportServer and * SslTransport classes. All Transports and TransportServers created from this * factory will have their needClientAuth option set to false. - * - * @author sepandm@gmail.com (Sepand) - * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) - * */ public class SslTransportFactory extends TcpTransportFactory { - // The log this uses., private static final Logger LOG = LoggerFactory.getLogger(SslTransportFactory.class); /** @@ -82,7 +74,6 @@ public class SslTransportFactory extends TcpTransportFactory { */ @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class); IntrospectionSupport.setProperties(sslTransport, options); @@ -138,7 +129,6 @@ public class SslTransportFactory extends TcpTransportFactory { * @throws IOException */ protected SocketFactory createSocketFactory() throws IOException { - if( SslContext.getCurrentSslContext()!=null ) { SslContext ctx = SslContext.getCurrentSslContext(); try { @@ -150,19 +140,4 @@ public class SslTransportFactory extends TcpTransportFactory { return SSLSocketFactory.getDefault(); } } - - /** - * - * @param km - * @param tm - * @param random - * @deprecated "Do not use anymore... using static initializers like this method only allows the JVM to use 1 SSL configuration per broker." - * @see org.apache.activemq.broker.SslContext#setCurrentSslContext(SslContext) - * @see org.apache.activemq.broker.SslContext#getSSLContext() - */ - public void setKeyAndTrustManagers(KeyManager[] km, TrustManager[] tm, SecureRandom random) { - SslContext ctx = new SslContext(km, tm, random); - SslContext.setCurrentSslContext(ctx); - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java index 557397774d..9d7a25ed05 100644 --- a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java @@ -23,29 +23,26 @@ import javax.annotation.PreDestroy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.usage.SystemUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.CachedIntrospectionResults; /** * An ActiveMQ Message Broker. It consists of a number of transport * connectors, network connectors and a bunch of properties which can be used to * configure the broker as its lazily created. - * + * * @org.apache.xbean.XBean element="broker" rootElement="true" - * @org.apache.xbean.Defaults {code:xml} + * @org.apache.xbean.Defaults {code:xml} * * lets. * see what it includes. - * + * * {code} - * + * */ public class XBeanBrokerService extends BrokerService { - private static final transient Logger LOG = LoggerFactory.getLogger(XBeanBrokerService.class); - + private boolean start = true; - + public XBeanBrokerService() { } @@ -102,23 +99,4 @@ public class XBeanBrokerService extends BrokerService { public void setStart(boolean start) { this.start = start; } - - /** - * Sets whether the broker should shutdown the ApplicationContext when the broker jvm is shutdown. - * The broker can be stopped because the underlying JDBC store is unavailable for example. - */ - @Deprecated - public void setDestroyApplicationContextOnShutdown(boolean destroy) { - LOG.warn("destroyApplicationContextOnShutdown parameter is deprecated, please use shutdown hooks instead"); - } - - /** - * Sets whether the broker should shutdown the ApplicationContext when the broker is stopped. - * The broker can be stopped because the underlying JDBC store is unavailable for example. - */ - @Deprecated - public void setDestroyApplicationContextOnStop(boolean destroy) { - LOG.warn("destroyApplicationContextOnStop parameter is deprecated, please use shutdown hooks instead"); - } - } diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java deleted file mode 100755 index 9b5ce35ae8..0000000000 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.http; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; - -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.transport.util.TextWireFormat; -import org.apache.activemq.util.ByteArrayOutputStream; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @deprecated - * @see HttpClientTransport - */ -@Deprecated -public class HttpTransport extends HttpTransportSupport { - - private static final Logger LOG = LoggerFactory.getLogger(HttpTransport.class); - - private HttpURLConnection sendConnection; - private HttpURLConnection receiveConnection; - private URL url; - private String clientID; - private volatile int receiveCounter; - - // private String sessionID; - - public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { - super(wireFormat, remoteUrl); - url = new URL(remoteUrl.toString()); - } - - public void oneway(Object o) throws IOException { - final Command command = (Command)o; - try { - if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) { - boolean startGetThread = clientID == null; - clientID = ((ConnectionInfo)command).getClientId(); - if (startGetThread && isStarted()) { - try { - super.doStart(); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - } - } - - HttpURLConnection connection = getSendConnection(); - String text = getTextWireFormat().marshalText(command); - Writer writer = new OutputStreamWriter(connection.getOutputStream()); - writer.write(text); - writer.flush(); - int answer = connection.getResponseCode(); - if (answer != HttpURLConnection.HTTP_OK) { - throw new IOException("Failed to post command: " + command + " as response was: " + answer); - } - // checkSession(connection); - } catch (IOException e) { - throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); - } - } - - public void run() { - LOG.trace("HTTP GET consumer thread starting for transport: " + this); - URI remoteUrl = getRemoteUrl(); - while (!isStopped()) { - try { - HttpURLConnection connection = getReceiveConnection(); - int answer = connection.getResponseCode(); - if (answer != HttpURLConnection.HTTP_OK) { - if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) { - LOG.trace("GET timed out"); - } else { - LOG.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); - } - } else { - // checkSession(connection); - - // Create a String for the UTF content - receiveCounter++; - InputStream is = connection.getInputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024); - int c = 0; - while ((c = is.read()) >= 0) { - baos.write(c); - } - ByteSequence sequence = baos.toByteSequence(); - String data = new String(sequence.data, sequence.offset, sequence.length, "UTF-8"); - - Command command = (Command)getTextWireFormat().unmarshalText(data); - - if (command == null) { - LOG.warn("Received null packet from url: " + remoteUrl); - } else { - doConsume(command); - } - } - } catch (Throwable e) { - if (!isStopped()) { - LOG.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e); - } else { - LOG.trace("Caught error after closed: " + e, e); - } - } finally { - safeClose(receiveConnection); - receiveConnection = null; - } - } - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected HttpURLConnection createSendConnection() throws IOException { - HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection(); - conn.setDoOutput(true); - conn.setRequestMethod("POST"); - configureConnection(conn); - conn.connect(); - return conn; - } - - protected HttpURLConnection createReceiveConnection() throws IOException { - HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection(); - conn.setDoOutput(false); - conn.setDoInput(true); - conn.setRequestMethod("GET"); - configureConnection(conn); - conn.connect(); - return conn; - } - - // protected void checkSession(HttpURLConnection connection) - // { - // String set_cookie=connection.getHeaderField("Set-Cookie"); - // if (set_cookie!=null && set_cookie.startsWith("JSESSIONID=")) - // { - // String[] bits=set_cookie.split("[=;]"); - // sessionID=bits[1]; - // } - // } - - protected void configureConnection(HttpURLConnection connection) { - // if (sessionID !=null) { - // connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID); - // } - // else - if (clientID != null) { - connection.setRequestProperty("clientID", clientID); - } - } - - protected URL getRemoteURL() { - return url; - } - - protected HttpURLConnection getSendConnection() throws IOException { - setSendConnection(createSendConnection()); - return sendConnection; - } - - protected HttpURLConnection getReceiveConnection() throws IOException { - setReceiveConnection(createReceiveConnection()); - return receiveConnection; - } - - protected void setSendConnection(HttpURLConnection conn) { - safeClose(sendConnection); - sendConnection = conn; - } - - protected void setReceiveConnection(HttpURLConnection conn) { - safeClose(receiveConnection); - receiveConnection = conn; - } - - protected void doStart() throws Exception { - // Don't start the background thread until the clientId has been - // established. - if (clientID != null) { - super.doStart(); - } - } - - protected void doStop(ServiceStopper stopper) throws Exception { - stopper.run(new Callback() { - public void execute() throws Exception { - safeClose(sendConnection); - } - }); - sendConnection = null; - stopper.run(new Callback() { - public void execute() { - safeClose(receiveConnection); - } - }); - } - - /** - * @param connection TODO - */ - private void safeClose(HttpURLConnection connection) { - if (connection != null) { - connection.disconnect(); - } - } - - public int getReceiveCounter() { - return receiveCounter; - } - -} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransport.java b/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransport.java deleted file mode 100755 index 21c1bc5a0c..0000000000 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransport.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.https; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URI; -import javax.net.ssl.HttpsURLConnection; - -import org.apache.activemq.transport.http.HttpTransport; -import org.apache.activemq.transport.util.TextWireFormat; - -/** - * @deprecated - * @see HttpsClientTransport - */ -@Deprecated -public class HttpsTransport extends HttpTransport { - - public HttpsTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { - super(wireFormat, remoteUrl); - } - - protected synchronized HttpURLConnection createSendConnection() throws IOException { - HttpsURLConnection conn = (HttpsURLConnection) getRemoteURL().openConnection(); - conn.setDoOutput(true); - conn.setRequestMethod("POST"); - configureConnection(conn); - conn.connect(); - return conn; - } - - protected synchronized HttpURLConnection createReceiveConnection() throws IOException { - HttpsURLConnection conn = (HttpsURLConnection) getRemoteURL().openConnection(); - conn.setDoOutput(false); - conn.setDoInput(true); - conn.setRequestMethod("GET"); - configureConnection(conn); - conn.connect(); - return conn; - } - -} diff --git a/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java b/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java deleted file mode 100644 index 29df2ab479..0000000000 --- a/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.util.xstream; - -import java.io.Serializable; -import java.io.StringReader; -import java.io.StringWriter; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.TextMessage; - -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.io.HierarchicalStreamDriver; -import com.thoughtworks.xstream.io.HierarchicalStreamReader; -import com.thoughtworks.xstream.io.HierarchicalStreamWriter; -import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; -import com.thoughtworks.xstream.io.xml.XppReader; -import org.apache.activemq.MessageTransformerSupport; - -/** - * Transforms object messages to text messages and vice versa using - * {@link XStream} - * - * @deprecated as of 5.3.0 release replaced by {@link org.apache.activemq.util.oxm.XStreamMessageTransformer} - * - * - */ -@Deprecated -public class XStreamMessageTransformer extends MessageTransformerSupport { - - protected MessageTransform transformType; - private XStream xStream; - - /** - * Specialized driver to be used with stream readers and writers - */ - private HierarchicalStreamDriver streamDriver; - - /** - * Defines the type of transformation. If XML (default), - producer - * transformation transforms from Object to XML. - consumer transformation - * transforms from XML to Object. If OBJECT, - producer transformation - * transforms from XML to Object. - consumer transformation transforms from - * Object to XML. If ADAPTIVE, - producer transformation transforms from - * Object to XML, or XML to Object depending on the type of the original - * message - consumer transformation transforms from XML to Object, or - * Object to XML depending on the type of the original message - */ - public enum MessageTransform { - XML, OBJECT, ADAPTIVE - }; - - - public XStreamMessageTransformer() { - this(MessageTransform.XML); - } - - public XStreamMessageTransformer(MessageTransform transformType) { - this.transformType = transformType; - } - - public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException { - switch (transformType) { - case XML: - return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message; - case OBJECT: - return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message; - case ADAPTIVE: - return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message; - default: - } - return message; - } - - public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException { - switch (transformType) { - case XML: - return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message; - case OBJECT: - return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message; - case ADAPTIVE: - return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message; - default: - } - return message; - } - - // Properties - // ------------------------------------------------------------------------- - public XStream getXStream() { - if (xStream == null) { - xStream = createXStream(); - } - return xStream; - } - - public void setXStream(XStream xStream) { - this.xStream = xStream; - } - - public HierarchicalStreamDriver getStreamDriver() { - return streamDriver; - } - - public void setStreamDriver(HierarchicalStreamDriver streamDriver) { - this.streamDriver = streamDriver; - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected XStream createXStream() { - return new XStream(); - } - - public MessageTransform getTransformType() { - return transformType; - } - - public void setTransformType(MessageTransform transformType) { - this.transformType = transformType; - } - - /** - * Transforms an incoming XML encoded {@link TextMessage} to an - * {@link ObjectMessage} - * - * @param session - JMS session currently being used - * @param textMessage - text message to transform to object message - * @return ObjectMessage - * @throws JMSException - */ - protected ObjectMessage textToObject(Session session, TextMessage textMessage) throws JMSException { - Object object = unmarshall(session, textMessage); - if (object instanceof Serializable) { - ObjectMessage answer = session.createObjectMessage((Serializable)object); - copyProperties(textMessage, answer); - return answer; - } else { - throw new JMSException("Object is not serializable: " + object); - } - } - - /** - * Transforms an incoming {@link ObjectMessage} to an XML encoded - * {@link TextMessage} - * - * @param session - JMS session currently being used - * @param objectMessage - object message to transform to text message - * @return XML encoded TextMessage - * @throws JMSException - */ - protected TextMessage objectToText(Session session, ObjectMessage objectMessage) throws JMSException { - TextMessage answer = session.createTextMessage(marshall(session, objectMessage)); - copyProperties(objectMessage, answer); - return answer; - } - - /** - * Marshalls the Object in the {@link ObjectMessage} to a string using XML - * encoding - */ - protected String marshall(Session session, ObjectMessage objectMessage) throws JMSException { - Serializable object = objectMessage.getObject(); - StringWriter buffer = new StringWriter(); - HierarchicalStreamWriter out; - if (streamDriver != null) { - out = streamDriver.createWriter(buffer); - } else { - out = new PrettyPrintWriter(buffer); - } - getXStream().marshal(object, out); - return buffer.toString(); - } - - /** - * Unmarshalls the XML encoded message in the {@link TextMessage} to an - * Object - */ - protected Object unmarshall(Session session, TextMessage textMessage) throws JMSException { - HierarchicalStreamReader in; - if (streamDriver != null) { - in = streamDriver.createReader(new StringReader(textMessage.getText())); - } else { - in = new XppReader(new StringReader(textMessage.getText())); - } - return getXStream().unmarshal(in); - } - -}