Deprecated the streams functionality.  Remove in a later release.
This commit is contained in:
Timothy Bish 2014-01-21 15:00:16 -05:00
parent e366917efc
commit 90104943b2
6 changed files with 33 additions and 5 deletions

View File

@ -170,9 +170,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final AtomicBoolean transportFailed = new AtomicBoolean(false); private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
// Stream are deprecated and will be removed in a later release.
private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
// Maps ConsumerIds to ActiveMQConsumer objects // Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
@ -677,6 +678,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQConnectionConsumer c = i.next(); ActiveMQConnectionConsumer c = i.next();
c.dispose(); c.dispose();
} }
// Stream are deprecated and will be removed in a later release.
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next(); ActiveMQInputStream c = i.next();
c.dispose(); c.dispose();
@ -1598,6 +1600,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQConnectionConsumer c = i.next(); ActiveMQConnectionConsumer c = i.next();
c.dispose(); c.dispose();
} }
// Stream are deprecated and will be removed in a later release.
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next(); ActiveMQInputStream c = i.next();
c.dispose(); c.dispose();
@ -2186,45 +2190,54 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
@Override @Override
@Deprecated
public InputStream createInputStream(Destination dest) throws JMSException { public InputStream createInputStream(Destination dest) throws JMSException {
return createInputStream(dest, null); return createInputStream(dest, null);
} }
@Override @Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
return createInputStream(dest, messageSelector, false); return createInputStream(dest, messageSelector, false);
} }
@Override @Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
return createInputStream(dest, messageSelector, noLocal, -1); return createInputStream(dest, messageSelector, noLocal, -1);
} }
@Override @Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
} }
@Override @Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false); return createInputStream(dest, null, false);
} }
@Override @Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, false); return createDurableInputStream(dest, name, messageSelector, false);
} }
@Override @Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, noLocal, -1); return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
} }
@Override @Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
} }
@Deprecated
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
ensureConnectionInfoSent(); ensureConnectionInfoSent();
@ -2236,6 +2249,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* to disk/database by the broker * to disk/database by the broker
*/ */
@Override @Override
@Deprecated
public OutputStream createOutputStream(Destination dest) throws JMSException { public OutputStream createOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
} }
@ -2244,6 +2258,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Creates a non persistent output stream; messages will not be written to * Creates a non persistent output stream; messages will not be written to
* disk * disk
*/ */
@Deprecated
public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
} }
@ -2261,6 +2276,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* method * method
*/ */
@Override @Override
@Deprecated
public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
ensureConnectionInfoSent(); ensureConnectionInfoSent();
@ -2338,18 +2354,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
@Deprecated
public void addOutputStream(ActiveMQOutputStream stream) { public void addOutputStream(ActiveMQOutputStream stream) {
outputStreams.add(stream); outputStreams.add(stream);
} }
@Deprecated
public void removeOutputStream(ActiveMQOutputStream stream) { public void removeOutputStream(ActiveMQOutputStream stream) {
outputStreams.remove(stream); outputStreams.remove(stream);
} }
@Deprecated
public void addInputStream(ActiveMQInputStream stream) { public void addInputStream(ActiveMQInputStream stream) {
inputStreams.add(stream); inputStreams.add(stream);
} }
@Deprecated
public void removeInputStream(ActiveMQInputStream stream) { public void removeInputStream(ActiveMQInputStream stream) {
inputStreams.remove(stream); inputStreams.remove(stream);
} }

View File

@ -43,6 +43,7 @@ import org.apache.activemq.util.JMSExceptionSupport;
/** /**
* *
*/ */
@Deprecated
public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
private final ActiveMQConnection connection; private final ActiveMQConnection connection;

View File

@ -36,6 +36,7 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Deprecated
public class ActiveMQOutputStream extends OutputStream implements Disposable { public class ActiveMQOutputStream extends OutputStream implements Disposable {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);

View File

@ -34,6 +34,7 @@ import javax.jms.Topic;
* *
* *
*/ */
@Deprecated
public interface StreamConnection extends Connection { public interface StreamConnection extends Connection {
InputStream createInputStream(Destination dest) throws JMSException; InputStream createInputStream(Destination dest) throws JMSException;

View File

@ -23,11 +23,13 @@ import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Deprecated
public class ActiveMQInputStreamTest extends TestCase { public class ActiveMQInputStreamTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
@ -39,6 +41,7 @@ public class ActiveMQInputStreamTest extends TestCase {
private BrokerService broker; private BrokerService broker;
private String connectionUri; private String connectionUri;
@Override
public void setUp() throws Exception { public void setUp() throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setUseJmx(false); broker.setUseJmx(false);
@ -53,6 +56,7 @@ public class ActiveMQInputStreamTest extends TestCase {
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
} }
@Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQTopic;
/** /**
* JMSInputStreamTest * JMSInputStreamTest
*/ */
@Deprecated
public class JMSInputStreamTest extends JmsTestSupport { public class JMSInputStreamTest extends JmsTestSupport {
public Destination destination; public Destination destination;