Remove old deprecated code.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1387960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-09-20 10:48:56 +00:00
parent 11e6f88ded
commit 6f2ac637cd
10 changed files with 44 additions and 782 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.console.filter;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.jms.Connection;
@ -74,8 +73,8 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter {
String selector = "";
// Convert to message selector
for (Iterator i = queries.iterator(); i.hasNext(); ) {
selector = selector + "(" + i.next().toString() + ") AND ";
for (Object query : queries) {
selector = selector + "(" + query.toString() + ") AND ";
}
// Remove last AND
@ -127,21 +126,6 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter {
return null;
}
/**
* Create and start a JMS connection
*
* @param brokerUrl - broker url to connect to.
* @return JMS connection
* @throws JMSException
* @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created.
*/
@Deprecated
protected Connection createConnection(URI brokerUrl) throws JMSException {
// maintain old behaviour, when called this way.
connectionFactory = (new ActiveMQConnectionFactory(brokerUrl));
return createConnection();
}
/**
* Create and start a JMS connection
*
@ -150,15 +134,14 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter {
*/
protected Connection createConnection() throws JMSException {
// maintain old behaviour, when called either way.
if (null == connectionFactory)
if (null == connectionFactory) {
connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
}
Connection conn = connectionFactory.createConnection();
conn.start();
return conn;
}
/**
* Get the broker url being used.
*
@ -194,5 +177,4 @@ public class AmqMessagesQueryFilter extends AbstractQueryFilter {
public void setDestination(Destination destination) {
this.destination = destination;
}
}

View File

@ -19,13 +19,13 @@ package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
@ -46,13 +46,12 @@ import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*
*/
public class TransportConnector implements Connector, BrokerServiceAware {
final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector;
private BrokerService brokerService;
private TransportServer server;
@ -90,7 +89,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
setEnableStatusMonitor(false);
}
}
}
/**
@ -104,8 +102,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
* Factory method to create a JMX managed version of this transport
* connector
*/
public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
throws IOException, URISyntaxException {
public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo());
rc.setConnectUri(getConnectUri());
@ -136,19 +133,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
this.brokerInfo = brokerInfo;
}
/**
*
* @deprecated use the {@link #setBrokerService(BrokerService)} method
* instead.
*/
@Deprecated
public void setBrokerName(String name) {
if (this.brokerInfo == null) {
this.brokerInfo = new BrokerInfo();
}
this.brokerInfo.setBrokerName(name);
}
public TransportServer getServer() throws IOException, URISyntaxException {
if (server == null) {
setServer(createTransportServer());
@ -272,8 +256,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
String publishableConnectString = null;
if (theConnectURI != null) {
publishableConnectString = theConnectURI.toString();
// strip off server side query parameters which may not be compatible to
// clients
// strip off server side query parameters which may not be compatible to clients
if (theConnectURI.getRawQuery() != null) {
publishableConnectString = publishableConnectString.substring(0, publishableConnectString
.indexOf(theConnectURI.getRawQuery()) - 1);
@ -297,9 +280,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
this.statusDector.stop();
}
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
TransportConnection c = iter.next();
ss.stop(c);
for (TransportConnection connection : connections) {
ss.stop(connection);
}
server = null;
ss.throwFirstException();
@ -341,8 +323,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
if (discoveryUri != null) {
DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
if( agent!=null && agent instanceof BrokerServiceAware ) {
((BrokerServiceAware)agent).setBrokerService(brokerService);
if (agent != null && agent instanceof BrokerServiceAware) {
((BrokerServiceAware) agent).setBrokerService(brokerService);
}
return agent;
@ -428,9 +410,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
return control;
}
public void addPeerBroker(BrokerInfo info) {
if (isMatchesClusterFilter(info.getBrokerName())) {
synchronized (peerBrokers) {
@ -438,7 +419,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
}
}
}
public void removePeerBroker(BrokerInfo info) {
synchronized (peerBrokers) {
getPeerBrokers().remove(info.getBrokerURL());
@ -455,7 +436,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
}
public void updateClientClusterInfo() {
if (isRebalanceClusterClients() || isUpdateClusterClients()) {
ConnectionControl control = getConnectionControl();
for (Connection c : this.connections) {
@ -480,6 +460,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
}
}
}
return result;
}
@ -620,5 +601,4 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
}
}

View File

@ -24,7 +24,7 @@ import org.apache.activemq.state.CommandVisitor;
/**
* @openwire:marshaller code="5"
*
*
*/
public class ConsumerInfo extends BaseCommand {
@ -117,7 +117,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* Is used to uniquely identify the consumer to the broker.
*
*
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
@ -130,7 +130,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* Is this consumer a queue browser?
*
*
* @openwire:property version=1
*/
public boolean isBrowser() {
@ -144,7 +144,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* The destination that the consumer is interested in receiving messages
* from. This destination could be a composite destination.
*
*
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
@ -158,7 +158,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* How many messages a broker will send to the client without receiving an
* ack before he stops dispatching messages to the client.
*
*
* @openwire:property version=1
*/
public int getPrefetchSize() {
@ -173,7 +173,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* How many messages a broker will keep around, above the prefetch limit,
* for non-durable topics before starting to discard older messages.
*
*
* @openwire:property version=1
*/
public int getMaximumPendingMessageLimit() {
@ -190,7 +190,7 @@ public class ConsumerInfo extends BaseCommand {
* done async, then he broker use a STP style of processing. STP is more
* appropriate in high bandwidth situations or when being used by and in vm
* transport.
*
*
* @openwire:property version=1
*/
public boolean isDispatchAsync() {
@ -204,7 +204,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* The JMS selector used to filter out messages that this consumer is
* interested in.
*
*
* @openwire:property version=1
*/
public String getSelector() {
@ -217,7 +217,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* Used to identify the name of a durable subscription.
*
*
* @openwire:property version=1
*/
public String getSubscriptionName() {
@ -228,28 +228,10 @@ public class ConsumerInfo extends BaseCommand {
this.subscriptionName = durableSubscriptionId;
}
/**
* @deprecated
* @return
* @see getSubscriptionName
*/
public String getSubcriptionName() {
return subscriptionName;
}
/**
* @deprecated
* @see setSubscriptionName
* @param durableSubscriptionId
*/
public void setSubcriptionName(String durableSubscriptionId) {
this.subscriptionName = durableSubscriptionId;
}
/**
* Set noLocal to true to avoid receiving messages that were published
* locally on the same connection.
*
*
* @openwire:property version=1
*/
public boolean isNoLocal() {
@ -265,7 +247,7 @@ public class ConsumerInfo extends BaseCommand {
* receive messages from the destination. If there are multiple exclusive
* consumers for a destination, the first one created will be the exclusive
* consumer of the destination.
*
*
* @openwire:property version=1
*/
public boolean isExclusive() {
@ -283,7 +265,7 @@ public class ConsumerInfo extends BaseCommand {
* published to the topic. If the consumer is durable then it will receive
* all persistent messages that are still stored in persistent storage for
* that topic.
*
*
* @openwire:property version=1
*/
public boolean isRetroactive() {
@ -305,7 +287,7 @@ public class ConsumerInfo extends BaseCommand {
* are other higher priority consumers available to dispatch to. This allows
* letting the broker to have an affinity to higher priority consumers.
* Default priority is 0.
*
*
* @openwire:property version=1
*/
public byte getPriority() {
@ -318,7 +300,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* The route of brokers the command has moved through.
*
*
* @openwire:property version=1 cache=true
*/
public BrokerId[] getBrokerPath() {
@ -334,7 +316,7 @@ public class ConsumerInfo extends BaseCommand {
* predicates into the selector on the fly. Handy if if say a Security
* Broker interceptor wants to filter out messages based on security level
* of the consumer.
*
*
* @openwire:property version=1
*/
public BooleanExpression getAdditionalPredicate() {
@ -396,7 +378,7 @@ public class ConsumerInfo extends BaseCommand {
/**
* The broker may be able to optimize it's processing or provides better QOS
* if it knows the consumer will not be sending ranged acks.
*
*
* @return true if the consumer will not send range acks.
* @openwire:property version=1
*/
@ -423,11 +405,11 @@ public class ConsumerInfo extends BaseCommand {
}
}
}
public synchronized boolean isNetworkConsumersEmpty() {
return networkConsumerIds == null || networkConsumerIds.isEmpty();
}
public synchronized List<ConsumerId> getNetworkConsumerIds(){
List<ConsumerId> result = new ArrayList<ConsumerId>();
if (networkConsumerIds != null) {
@ -437,10 +419,10 @@ public class ConsumerInfo extends BaseCommand {
}
/**
* Tracks the original subscription id that causes a subscription to
* Tracks the original subscription id that causes a subscription to
* percolate through a network when networkTTL > 1. Tracking the original
* subscription allows duplicate suppression.
*
*
* @return array of the current subscription path
* @openwire:property version=4
*/
@ -451,7 +433,7 @@ public class ConsumerInfo extends BaseCommand {
}
return result;
}
public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
if (consumerPath != null) {
for (int i=0; i<consumerPath.length; i++) {
@ -463,7 +445,7 @@ public class ConsumerInfo extends BaseCommand {
public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
this.lastDeliveredSequenceId = lastDeliveredSequenceId;
}
public long getLastDeliveredSequenceId() {
return lastDeliveredSequenceId;
}

View File

@ -1,122 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;
/**
* A Valve is a synchronization object used enable or disable the "flow" of
* concurrent processing.
*
* @deprecated
*/
@Deprecated
public final class Valve {
private final Object mutex = new Object();
private boolean on;
private int turningOff;
private int usage;
public Valve(boolean on) {
this.on = on;
}
/**
* Turns the valve on. This method blocks until the valve is off.
*
* @throws InterruptedException if wait is interrupted
*/
public void turnOn() throws InterruptedException {
synchronized (mutex) {
while (on) {
mutex.wait();
}
on = true;
mutex.notifyAll();
}
}
public boolean isOn() {
synchronized (mutex) {
return on;
}
}
/**
* Turns the valve off. This method blocks until the valve is on and the
* valve is not in use.
*
* @throws InterruptedException if wait is interrupted
*/
public void turnOff() throws InterruptedException {
synchronized (mutex) {
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
try {
++turningOff;
while (usage > 0 || !on) {
mutex.wait();
}
on = false;
mutex.notifyAll();
} finally {
--turningOff;
}
}
}
/**
* Increments the use counter of the valve. This method blocks if the valve
* is off, or is being turned off.
*
* @throws InterruptedException if wait is interrupted
*/
public void increment() throws InterruptedException {
synchronized (mutex) {
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
if (usage < 0) {
throw new IllegalStateException("Unbalanced usage: " + usage);
}
// Do we have to wait for the value to be on?
while (turningOff > 0 || !on) {
mutex.wait();
}
usage++;
}
}
/**
* Decrements the use counter of the valve.
*/
public void decrement() {
synchronized (mutex) {
usage--;
if (turningOff < 0) {
throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
}
if (usage < 0) {
throw new IllegalStateException("Unbalanced usage: " + usage);
}
if (turningOff > 0 && usage < 1) {
mutex.notifyAll();
}
}
}
}

View File

@ -112,13 +112,6 @@ public abstract class TransportFactory {
return tf.doBind(location);
}
/**
* @deprecated
*/
public static TransportServer bind(String brokerId, URI location) throws IOException {
return bind(location);
}
public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
TransportFactory tf = findTransportFactory(location);
if( brokerService!=null && tf instanceof BrokerServiceAware ) {
@ -159,7 +152,6 @@ public abstract class TransportFactory {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
@ -265,7 +257,7 @@ public abstract class TransportFactory {
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
if (options.containsKey(THREAD_NAME_FILTER)) {
transport = new ThreadNameFilter(transport);
}

View File

@ -20,16 +20,13 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
@ -46,13 +43,8 @@ import org.slf4j.LoggerFactory;
* contribution from this class is that it is aware of SslTransportServer and
* SslTransport classes. All Transports and TransportServers created from this
* factory will have their needClientAuth option set to false.
*
* @author sepandm@gmail.com (Sepand)
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
*
*/
public class SslTransportFactory extends TcpTransportFactory {
// The log this uses.,
private static final Logger LOG = LoggerFactory.getLogger(SslTransportFactory.class);
/**
@ -82,7 +74,6 @@ public class SslTransportFactory extends TcpTransportFactory {
*/
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
IntrospectionSupport.setProperties(sslTransport, options);
@ -138,7 +129,6 @@ public class SslTransportFactory extends TcpTransportFactory {
* @throws IOException
*/
protected SocketFactory createSocketFactory() throws IOException {
if( SslContext.getCurrentSslContext()!=null ) {
SslContext ctx = SslContext.getCurrentSslContext();
try {
@ -150,19 +140,4 @@ public class SslTransportFactory extends TcpTransportFactory {
return SSLSocketFactory.getDefault();
}
}
/**
*
* @param km
* @param tm
* @param random
* @deprecated "Do not use anymore... using static initializers like this method only allows the JVM to use 1 SSL configuration per broker."
* @see org.apache.activemq.broker.SslContext#setCurrentSslContext(SslContext)
* @see org.apache.activemq.broker.SslContext#getSSLContext()
*/
public void setKeyAndTrustManagers(KeyManager[] km, TrustManager[] tm, SecureRandom random) {
SslContext ctx = new SslContext(km, tm, random);
SslContext.setCurrentSslContext(ctx);
}
}

View File

@ -23,29 +23,26 @@ import javax.annotation.PreDestroy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.CachedIntrospectionResults;
/**
* An ActiveMQ Message Broker. It consists of a number of transport
* connectors, network connectors and a bunch of properties which can be used to
* configure the broker as its lazily created.
*
*
* @org.apache.xbean.XBean element="broker" rootElement="true"
* @org.apache.xbean.Defaults {code:xml}
* @org.apache.xbean.Defaults {code:xml}
* <broker test="foo.bar">
* lets.
* see what it includes.
* </broker>
* </broker>
* {code}
*
*
*/
public class XBeanBrokerService extends BrokerService {
private static final transient Logger LOG = LoggerFactory.getLogger(XBeanBrokerService.class);
private boolean start = true;
public XBeanBrokerService() {
}
@ -102,23 +99,4 @@ public class XBeanBrokerService extends BrokerService {
public void setStart(boolean start) {
this.start = start;
}
/**
* Sets whether the broker should shutdown the ApplicationContext when the broker jvm is shutdown.
* The broker can be stopped because the underlying JDBC store is unavailable for example.
*/
@Deprecated
public void setDestroyApplicationContextOnShutdown(boolean destroy) {
LOG.warn("destroyApplicationContextOnShutdown parameter is deprecated, please use shutdown hooks instead");
}
/**
* Sets whether the broker should shutdown the ApplicationContext when the broker is stopped.
* The broker can be stopped because the underlying JDBC store is unavailable for example.
*/
@Deprecated
public void setDestroyApplicationContextOnStop(boolean destroy) {
LOG.warn("destroyApplicationContextOnStop parameter is deprecated, please use shutdown hooks instead");
}
}

View File

@ -1,240 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.http;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @deprecated
* @see HttpClientTransport
*/
@Deprecated
public class HttpTransport extends HttpTransportSupport {
private static final Logger LOG = LoggerFactory.getLogger(HttpTransport.class);
private HttpURLConnection sendConnection;
private HttpURLConnection receiveConnection;
private URL url;
private String clientID;
private volatile int receiveCounter;
// private String sessionID;
public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException {
super(wireFormat, remoteUrl);
url = new URL(remoteUrl.toString());
}
public void oneway(Object o) throws IOException {
final Command command = (Command)o;
try {
if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
boolean startGetThread = clientID == null;
clientID = ((ConnectionInfo)command).getClientId();
if (startGetThread && isStarted()) {
try {
super.doStart();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
}
HttpURLConnection connection = getSendConnection();
String text = getTextWireFormat().marshalText(command);
Writer writer = new OutputStreamWriter(connection.getOutputStream());
writer.write(text);
writer.flush();
int answer = connection.getResponseCode();
if (answer != HttpURLConnection.HTTP_OK) {
throw new IOException("Failed to post command: " + command + " as response was: " + answer);
}
// checkSession(connection);
} catch (IOException e) {
throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
}
}
public void run() {
LOG.trace("HTTP GET consumer thread starting for transport: " + this);
URI remoteUrl = getRemoteUrl();
while (!isStopped()) {
try {
HttpURLConnection connection = getReceiveConnection();
int answer = connection.getResponseCode();
if (answer != HttpURLConnection.HTTP_OK) {
if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
LOG.trace("GET timed out");
} else {
LOG.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
}
} else {
// checkSession(connection);
// Create a String for the UTF content
receiveCounter++;
InputStream is = connection.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024);
int c = 0;
while ((c = is.read()) >= 0) {
baos.write(c);
}
ByteSequence sequence = baos.toByteSequence();
String data = new String(sequence.data, sequence.offset, sequence.length, "UTF-8");
Command command = (Command)getTextWireFormat().unmarshalText(data);
if (command == null) {
LOG.warn("Received null packet from url: " + remoteUrl);
} else {
doConsume(command);
}
}
} catch (Throwable e) {
if (!isStopped()) {
LOG.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
} else {
LOG.trace("Caught error after closed: " + e, e);
}
} finally {
safeClose(receiveConnection);
receiveConnection = null;
}
}
}
// Implementation methods
// -------------------------------------------------------------------------
protected HttpURLConnection createSendConnection() throws IOException {
HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("POST");
configureConnection(conn);
conn.connect();
return conn;
}
protected HttpURLConnection createReceiveConnection() throws IOException {
HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection();
conn.setDoOutput(false);
conn.setDoInput(true);
conn.setRequestMethod("GET");
configureConnection(conn);
conn.connect();
return conn;
}
// protected void checkSession(HttpURLConnection connection)
// {
// String set_cookie=connection.getHeaderField("Set-Cookie");
// if (set_cookie!=null && set_cookie.startsWith("JSESSIONID="))
// {
// String[] bits=set_cookie.split("[=;]");
// sessionID=bits[1];
// }
// }
protected void configureConnection(HttpURLConnection connection) {
// if (sessionID !=null) {
// connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID);
// }
// else
if (clientID != null) {
connection.setRequestProperty("clientID", clientID);
}
}
protected URL getRemoteURL() {
return url;
}
protected HttpURLConnection getSendConnection() throws IOException {
setSendConnection(createSendConnection());
return sendConnection;
}
protected HttpURLConnection getReceiveConnection() throws IOException {
setReceiveConnection(createReceiveConnection());
return receiveConnection;
}
protected void setSendConnection(HttpURLConnection conn) {
safeClose(sendConnection);
sendConnection = conn;
}
protected void setReceiveConnection(HttpURLConnection conn) {
safeClose(receiveConnection);
receiveConnection = conn;
}
protected void doStart() throws Exception {
// Don't start the background thread until the clientId has been
// established.
if (clientID != null) {
super.doStart();
}
}
protected void doStop(ServiceStopper stopper) throws Exception {
stopper.run(new Callback() {
public void execute() throws Exception {
safeClose(sendConnection);
}
});
sendConnection = null;
stopper.run(new Callback() {
public void execute() {
safeClose(receiveConnection);
}
});
}
/**
* @param connection TODO
*/
private void safeClose(HttpURLConnection connection) {
if (connection != null) {
connection.disconnect();
}
}
public int getReceiveCounter() {
return receiveCounter;
}
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.https;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
import javax.net.ssl.HttpsURLConnection;
import org.apache.activemq.transport.http.HttpTransport;
import org.apache.activemq.transport.util.TextWireFormat;
/**
* @deprecated
* @see HttpsClientTransport
*/
@Deprecated
public class HttpsTransport extends HttpTransport {
public HttpsTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException {
super(wireFormat, remoteUrl);
}
protected synchronized HttpURLConnection createSendConnection() throws IOException {
HttpsURLConnection conn = (HttpsURLConnection) getRemoteURL().openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("POST");
configureConnection(conn);
conn.connect();
return conn;
}
protected synchronized HttpURLConnection createReceiveConnection() throws IOException {
HttpsURLConnection conn = (HttpsURLConnection) getRemoteURL().openConnection();
conn.setDoOutput(false);
conn.setDoInput(true);
conn.setRequestMethod("GET");
configureConnection(conn);
conn.connect();
return conn;
}
}

View File

@ -1,207 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
import org.apache.activemq.MessageTransformerSupport;
/**
* Transforms object messages to text messages and vice versa using
* {@link XStream}
*
* @deprecated as of 5.3.0 release replaced by {@link org.apache.activemq.util.oxm.XStreamMessageTransformer}
*
*
*/
@Deprecated
public class XStreamMessageTransformer extends MessageTransformerSupport {
protected MessageTransform transformType;
private XStream xStream;
/**
* Specialized driver to be used with stream readers and writers
*/
private HierarchicalStreamDriver streamDriver;
/**
* Defines the type of transformation. If XML (default), - producer
* transformation transforms from Object to XML. - consumer transformation
* transforms from XML to Object. If OBJECT, - producer transformation
* transforms from XML to Object. - consumer transformation transforms from
* Object to XML. If ADAPTIVE, - producer transformation transforms from
* Object to XML, or XML to Object depending on the type of the original
* message - consumer transformation transforms from XML to Object, or
* Object to XML depending on the type of the original message
*/
public enum MessageTransform {
XML, OBJECT, ADAPTIVE
};
public XStreamMessageTransformer() {
this(MessageTransform.XML);
}
public XStreamMessageTransformer(MessageTransform transformType) {
this.transformType = transformType;
}
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
switch (transformType) {
case XML:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
case OBJECT:
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
case ADAPTIVE:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
default:
}
return message;
}
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
switch (transformType) {
case XML:
return (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
case OBJECT:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : message;
case ADAPTIVE:
return (message instanceof TextMessage) ? textToObject(session, (TextMessage)message) : (message instanceof ObjectMessage) ? objectToText(session, (ObjectMessage)message) : message;
default:
}
return message;
}
// Properties
// -------------------------------------------------------------------------
public XStream getXStream() {
if (xStream == null) {
xStream = createXStream();
}
return xStream;
}
public void setXStream(XStream xStream) {
this.xStream = xStream;
}
public HierarchicalStreamDriver getStreamDriver() {
return streamDriver;
}
public void setStreamDriver(HierarchicalStreamDriver streamDriver) {
this.streamDriver = streamDriver;
}
// Implementation methods
// -------------------------------------------------------------------------
protected XStream createXStream() {
return new XStream();
}
public MessageTransform getTransformType() {
return transformType;
}
public void setTransformType(MessageTransform transformType) {
this.transformType = transformType;
}
/**
* Transforms an incoming XML encoded {@link TextMessage} to an
* {@link ObjectMessage}
*
* @param session - JMS session currently being used
* @param textMessage - text message to transform to object message
* @return ObjectMessage
* @throws JMSException
*/
protected ObjectMessage textToObject(Session session, TextMessage textMessage) throws JMSException {
Object object = unmarshall(session, textMessage);
if (object instanceof Serializable) {
ObjectMessage answer = session.createObjectMessage((Serializable)object);
copyProperties(textMessage, answer);
return answer;
} else {
throw new JMSException("Object is not serializable: " + object);
}
}
/**
* Transforms an incoming {@link ObjectMessage} to an XML encoded
* {@link TextMessage}
*
* @param session - JMS session currently being used
* @param objectMessage - object message to transform to text message
* @return XML encoded TextMessage
* @throws JMSException
*/
protected TextMessage objectToText(Session session, ObjectMessage objectMessage) throws JMSException {
TextMessage answer = session.createTextMessage(marshall(session, objectMessage));
copyProperties(objectMessage, answer);
return answer;
}
/**
* Marshalls the Object in the {@link ObjectMessage} to a string using XML
* encoding
*/
protected String marshall(Session session, ObjectMessage objectMessage) throws JMSException {
Serializable object = objectMessage.getObject();
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out;
if (streamDriver != null) {
out = streamDriver.createWriter(buffer);
} else {
out = new PrettyPrintWriter(buffer);
}
getXStream().marshal(object, out);
return buffer.toString();
}
/**
* Unmarshalls the XML encoded message in the {@link TextMessage} to an
* Object
*/
protected Object unmarshall(Session session, TextMessage textMessage) throws JMSException {
HierarchicalStreamReader in;
if (streamDriver != null) {
in = streamDriver.createReader(new StringReader(textMessage.getText()));
} else {
in = new XppReader(new StringReader(textMessage.getText()));
}
return getXStream().unmarshal(in);
}
}