From 8858dc294cf3f992fc77053d9fc92f0422cf6605 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 18 Feb 2015 15:47:40 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5596 Remove the deprecated JMS Streams code. --- .../apache/activemq/ActiveMQConnection.java | 150 +------- .../apache/activemq/ActiveMQInputStream.java | 326 ------------------ .../apache/activemq/ActiveMQOutputStream.java | 222 ------------ .../org/apache/activemq/StreamConnection.java | 80 ----- .../activemq/ActiveMQInputStreamTest.java | 145 -------- .../apache/activemq/LargeStreamletTest.java | 162 --------- .../activemq/streams/JMSInputStreamTest.java | 281 --------------- 7 files changed, 1 insertion(+), 1365 deletions(-) delete mode 100644 activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java delete mode 100644 activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java delete mode 100644 activemq-client/src/main/java/org/apache/activemq/StreamConnection.java delete mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java delete mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java delete mode 100755 activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index aef8513943..51122cafda 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -17,8 +17,6 @@ package org.apache.activemq; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; -import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; @@ -108,7 +105,7 @@ import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { +public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection { public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; @@ -173,9 +170,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList connectionConsumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList transportListeners = new CopyOnWriteArrayList(); - // Stream are deprecated and will be removed in a later release. - private final CopyOnWriteArrayList inputStreams = new CopyOnWriteArrayList(); - private final CopyOnWriteArrayList outputStreams = new CopyOnWriteArrayList(); // Maps ConsumerIds to ActiveMQConsumer objects private final ConcurrentHashMap dispatchers = new ConcurrentHashMap(); @@ -183,7 +177,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); private final SessionId connectionSessionId; private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); - private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); @@ -681,15 +674,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon ActiveMQConnectionConsumer c = i.next(); c.dispose(); } - // Stream are deprecated and will be removed in a later release. - for (Iterator i = this.inputStreams.iterator(); i.hasNext();) { - ActiveMQInputStream c = i.next(); - c.dispose(); - } - for (Iterator i = this.outputStreams.iterator(); i.hasNext();) { - ActiveMQOutputStream c = i.next(); - c.dispose(); - } this.activeTempDestinations.clear(); @@ -1267,13 +1251,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); } - /** - * @return - */ - private ProducerId createProducerId() { - return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); - } - /** * Creates a QueueSession object. * @@ -1610,16 +1587,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon c.dispose(); } - // Stream are deprecated and will be removed in a later release. - for (Iterator i = this.inputStreams.iterator(); i.hasNext();) { - ActiveMQInputStream c = i.next(); - c.dispose(); - } - for (Iterator i = this.outputStreams.iterator(); i.hasNext();) { - ActiveMQOutputStream c = i.next(); - c.dispose(); - } - if (isConnectionInfoSentToBroker) { if (!transportFailed.get() && !closing.get()) { syncSendPacket(info.createRemoveCommand()); @@ -2199,100 +2166,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.objectMessageSerializationDefered = objectMessageSerializationDefered; } - @Override - @Deprecated - public InputStream createInputStream(Destination dest) throws JMSException { - return createInputStream(dest, null); - } - - @Override - @Deprecated - public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { - return createInputStream(dest, messageSelector, false); - } - - @Override - @Deprecated - public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { - return createInputStream(dest, messageSelector, noLocal, -1); - } - - @Override - @Deprecated - public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { - return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); - } - - @Override - @Deprecated - public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { - return createInputStream(dest, null, false); - } - - @Override - @Deprecated - public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { - return createDurableInputStream(dest, name, messageSelector, false); - } - - @Override - @Deprecated - public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { - return createDurableInputStream(dest, name, messageSelector, noLocal, -1); - } - - @Override - @Deprecated - public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { - return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); - } - - @Deprecated - private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { - checkClosedOrFailed(); - ensureConnectionInfoSent(); - return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); - } - - /** - * Creates a persistent output stream; individual messages will be written - * to disk/database by the broker - */ - @Override - @Deprecated - public OutputStream createOutputStream(Destination dest) throws JMSException { - return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); - } - - /** - * Creates a non persistent output stream; messages will not be written to - * disk - */ - @Deprecated - public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { - return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); - } - - /** - * Creates an output stream allowing full control over the delivery mode, - * the priority and time to live of the messages and the properties added to - * messages on the stream. - * - * @param streamProperties defines a map of key-value pairs where the keys - * are strings and the values are primitive values (numbers - * and strings) which are appended to the messages similarly - * to using the - * {@link javax.jms.Message#setObjectProperty(String, Object)} - * method - */ - @Override - @Deprecated - public OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkClosedOrFailed(); - ensureConnectionInfoSent(); - return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); - } - /** * Unsubscribes a durable subscription that has been created by a client. *

@@ -2312,7 +2185,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * specified. * @since 1.1 */ - @Override public void unsubscribe(String name) throws InvalidDestinationException, JMSException { checkClosedOrFailed(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); @@ -2364,26 +2236,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } - @Deprecated - public void addOutputStream(ActiveMQOutputStream stream) { - outputStreams.add(stream); - } - - @Deprecated - public void removeOutputStream(ActiveMQOutputStream stream) { - outputStreams.remove(stream); - } - - @Deprecated - public void addInputStream(ActiveMQInputStream stream) { - inputStreams.add(stream); - } - - @Deprecated - public void removeInputStream(ActiveMQInputStream stream) { - inputStreams.remove(stream); - } - protected void onControlCommand(ControlCommand command) { String text = command.getCommand(); if (text != null) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java deleted file mode 100644 index a8954217fa..0000000000 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java +++ /dev/null @@ -1,326 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.JMSExceptionSupport; - -/** - * - */ -@Deprecated -public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { - - private final ActiveMQConnection connection; - private final ConsumerInfo info; - // These are the messages waiting to be delivered to the client - private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel(); - - private int deliveredCounter; - private MessageDispatch lastDelivered; - private boolean eosReached; - private byte buffer[]; - private int pos; - private Map jmsProperties; - - private ProducerId producerId; - private long nextSequenceId; - private final long timeout; - private boolean firstReceived; - - public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout) - throws JMSException { - this.connection = connection; - - if (dest == null) { - throw new InvalidDestinationException("Don't understand null destinations"); - } else if (dest.isTemporary()) { - String physicalName = dest.getPhysicalName(); - - if (physicalName == null) { - throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); - } - - String connectionID = connection.getConnectionInfo().getConnectionId().getValue(); - - if (physicalName.indexOf(connectionID) < 0) { - throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); - } - - if (connection.isDeleted(dest)) { - throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); - } - } - - if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1"); - this.timeout = timeout; - - this.info = new ConsumerInfo(consumerId); - this.info.setSubscriptionName(name); - - if (selector != null && selector.trim().length() != 0) { - selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) "; - } else { - selector = "JMSType='org.apache.activemq.Stream'"; - } - - SelectorParser.parse(selector); - this.info.setSelector(selector); - - this.info.setPrefetchSize(prefetch); - this.info.setNoLocal(noLocal); - this.info.setBrowser(false); - this.info.setDispatchAsync(false); - - // Allows the options on the destination to configure the consumerInfo - if (dest.getOptions() != null) { - Map options = new HashMap(dest.getOptions()); - IntrospectionSupport.setProperties(this.info, options, "consumer."); - } - - this.info.setDestination(dest); - - this.connection.addInputStream(this); - this.connection.addDispatcher(info.getConsumerId(), this); - this.connection.syncSendPacket(info); - unconsumedMessages.start(); - } - - @Override - public void close() throws IOException { - if (!unconsumedMessages.isClosed()) { - try { - if (lastDelivered != null) { - MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); - connection.asyncSendPacket(ack); - } - dispose(); - this.connection.syncSendPacket(info.createRemoveCommand()); - } catch (JMSException e) { - throw IOExceptionSupport.create(e); - } - } - } - - public void dispose() { - if (!unconsumedMessages.isClosed()) { - unconsumedMessages.close(); - this.connection.removeDispatcher(info.getConsumerId()); - this.connection.removeInputStream(this); - } - } - - /** - * Return the JMS Properties which where used to send the InputStream - * - * @return jmsProperties - * @throws IOException - */ - public Map getJMSProperties() throws IOException { - if (jmsProperties == null) { - fillBuffer(); - } - return jmsProperties; - } - - /** - * This method allows the client to receive the Stream data as unaltered ActiveMQMessage - * object which is how the split stream data is sent. Each message will contains one - * chunk of the written bytes as well as a valid message group sequence id. The EOS - * message will have a message group sequence id of -1. - * - * This method is useful for testing, but should never be mixed with calls to the - * normal stream receive methods as it will break the normal stream processing flow - * and can lead to loss of data. - * - * @return an ActiveMQMessage object that either contains byte data or an end of strem - * marker. - * @throws JMSException - * @throws ReadTimeoutException - */ - public ActiveMQMessage receive() throws JMSException, ReadTimeoutException { - checkClosed(); - MessageDispatch md; - try { - if (firstReceived || timeout == -1) { - md = unconsumedMessages.dequeue(-1); - firstReceived = true; - } else { - md = unconsumedMessages.dequeue(timeout); - if (md == null) throw new ReadTimeoutException(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw JMSExceptionSupport.create(e); - } - - if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) { - return null; - } - - deliveredCounter++; - if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); - connection.asyncSendPacket(ack); - deliveredCounter = 0; - lastDelivered = null; - } else { - lastDelivered = md; - } - - return (ActiveMQMessage)md.getMessage(); - } - - /** - * @throws IllegalStateException - */ - protected void checkClosed() throws IllegalStateException { - if (unconsumedMessages.isClosed()) { - throw new IllegalStateException("The Consumer is closed"); - } - } - - /** - * - * @see InputStream#read() - * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout - */ - @Override - public int read() throws IOException { - fillBuffer(); - if (eosReached || buffer.length == 0) { - return -1; - } - - return buffer[pos++] & 0xff; - } - - /** - * - * @see InputStream#read(byte[], int, int) - * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout - */ - @Override - public int read(byte[] b, int off, int len) throws IOException { - fillBuffer(); - if (eosReached || buffer.length == 0) { - return -1; - } - - int max = Math.min(len, buffer.length - pos); - System.arraycopy(buffer, pos, b, off, max); - - pos += max; - return max; - } - - private void fillBuffer() throws IOException { - if (eosReached || (buffer != null && buffer.length > pos)) { - return; - } - try { - while (true) { - ActiveMQMessage m = receive(); - if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { - // First message. - long producerSequenceId = m.getMessageId().getProducerSequenceId(); - if (producerId == null) { - // We have to start a stream at sequence id = 0 - if (producerSequenceId != 0) { - continue; - } - nextSequenceId++; - producerId = m.getMessageId().getProducerId(); - } else { - // Verify it's the next message of the sequence. - if (!m.getMessageId().getProducerId().equals(producerId)) { - throw new IOException("Received an unexpected message: invalid producer: " + m); - } - if (producerSequenceId != nextSequenceId++) { - throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m); - } - } - - // Read the buffer in. - ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m; - buffer = new byte[(int)bm.getBodyLength()]; - bm.readBytes(buffer); - pos = 0; - if (jmsProperties == null) { - jmsProperties = Collections.unmodifiableMap(new HashMap(bm.getProperties())); - } - } else { - eosReached = true; - if (jmsProperties == null) { - // no properties found - jmsProperties = Collections.emptyMap(); - } - } - return; - } - } catch (JMSException e) { - eosReached = true; - if (jmsProperties == null) { - // no properties found - jmsProperties = Collections.emptyMap(); - } - throw IOExceptionSupport.create(e); - } - } - - @Override - public void dispatch(MessageDispatch md) { - unconsumedMessages.enqueue(md); - } - - @Override - public String toString() { - return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; - } - - /** - * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout - */ - public class ReadTimeoutException extends IOException { - private static final long serialVersionUID = -3217758894326719909L; - - public ReadTimeoutException() { - super(); - } - } -} diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java deleted file mode 100644 index c6e400d2ba..0000000000 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class ActiveMQOutputStream extends OutputStream implements Disposable { - - private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class); - - protected int count; - - final byte buffer[]; - - private final ActiveMQConnection connection; - private final Map properties; - private final ProducerInfo info; - - private long messageSequence; - private boolean closed; - private final int deliveryMode; - private final int priority; - private final long timeToLive; - private boolean alwaysSyncSend = false; - private boolean addPropertiesOnFirstMsgOnly = false; - - /** - * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb - */ - public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE"; - - public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map properties, int deliveryMode, int priority, - long timeToLive) throws JMSException { - this.connection = connection; - this.deliveryMode = deliveryMode; - this.priority = priority; - this.timeToLive = timeToLive; - this.properties = properties == null ? null : new HashMap(properties); - - Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE); - if (chunkSize == null) { - chunkSize = 64 * 1024; - } else { - if (chunkSize < 1) { - throw new IllegalArgumentException("Chunk size must be greater then 0"); - } else { - chunkSize *= 1024; - } - } - - buffer = new byte[chunkSize]; - - if (destination == null) { - throw new InvalidDestinationException("Don't understand null destinations"); - } - - this.info = new ProducerInfo(producerId); - - // Allows the options on the destination to configure the stream - if (destination.getOptions() != null) { - Map options = new HashMap(destination.getOptions()); - IntrospectionSupport.setProperties(this, options, "producer."); - IntrospectionSupport.setProperties(this.info, options, "producer."); - if (options.size() > 0) { - String msg = "There are " + options.size() - + " producer options that couldn't be set on the producer." - + " Check the options are spelled correctly." - + " Unknown parameters=[" + options + "]." - + " This producer cannot be started."; - LOG.warn(msg); - throw new ConfigurationException(msg); - } - } - - this.info.setDestination(destination); - - this.connection.addOutputStream(this); - this.connection.asyncSendPacket(info); - } - - @Override - public void close() throws IOException { - if (!closed) { - flushBuffer(); - try { - // Send an EOS style empty message to signal EOS. - send(new ActiveMQMessage(), true); - dispose(); - this.connection.asyncSendPacket(info.createRemoveCommand()); - } catch (JMSException e) { - IOExceptionSupport.create(e); - } - } - } - - @Override - public void dispose() { - if (!closed) { - this.connection.removeOutputStream(this); - closed = true; - } - } - - @Override - public synchronized void write(int b) throws IOException { - buffer[count++] = (byte) b; - if (count == buffer.length) { - flushBuffer(); - } - } - - @Override - public synchronized void write(byte b[], int off, int len) throws IOException { - while (len > 0) { - int max = Math.min(len, buffer.length - count); - System.arraycopy(b, off, buffer, count, max); - - len -= max; - count += max; - off += max; - - if (count == buffer.length) { - flushBuffer(); - } - } - } - - @Override - public synchronized void flush() throws IOException { - flushBuffer(); - } - - private void flushBuffer() throws IOException { - if (count != 0) { - try { - ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); - msg.writeBytes(buffer, 0, count); - send(msg, false); - } catch (JMSException e) { - throw IOExceptionSupport.create(e); - } - count = 0; - } - } - - /** - * @param msg - * @throws JMSException - */ - private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { - if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) { - for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) { - String key = iter.next(); - Object value = properties.get(key); - msg.setObjectProperty(key, value); - } - } - msg.setType("org.apache.activemq.Stream"); - msg.setGroupID(info.getProducerId().toString()); - if (eosMessage) { - msg.setGroupSequence(-1); - } else { - msg.setGroupSequence((int) messageSequence); - } - MessageId id = new MessageId(info.getProducerId(), messageSequence++); - connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend()); - } - - @Override - public String toString() { - return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; - } - - public boolean isAlwaysSyncSend() { - return alwaysSyncSend; - } - - public void setAlwaysSyncSend(boolean alwaysSyncSend) { - this.alwaysSyncSend = alwaysSyncSend; - } - - public boolean isAddPropertiesOnFirstMsgOnly() { - return addPropertiesOnFirstMsgOnly; - } - - public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) { - this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly; - } -} diff --git a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java b/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java deleted file mode 100644 index 2f75b4f39d..0000000000 --- a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Topic; - -/** - * The StreamConnection interface allows you to send and receive data from a - * Destination in using standard java InputStream and OutputStream objects. It's - * best use case is to send and receive large amounts of data that would be to - * large to hold in a single JMS message. - * - * - */ -@Deprecated -public interface StreamConnection extends Connection { - - InputStream createInputStream(Destination dest) throws JMSException; - - InputStream createInputStream(Destination dest, String messageSelector) throws JMSException; - - InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException; - - InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException; - - InputStream createDurableInputStream(Topic dest, String name) throws JMSException; - - InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException; - - InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException; - - InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException; - - OutputStream createOutputStream(Destination dest) throws JMSException; - - OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException; - - /** - * Unsubscribes a durable subscription that has been created by a client. - *

- * This method deletes the state being maintained on behalf of the - * subscriber by its provider. - *

- * It is erroneous for a client to delete a durable subscription while there - * is an active MessageConsumer or - * TopicSubscriber for the subscription, or while a consumed - * message is part of a pending transaction or has not been acknowledged in - * the session. - * - * @param name the name used to identify this subscription - * @throws JMSException if the session fails to unsubscribe to the durable - * subscription due to some internal error. - * @throws InvalidDestinationException if an invalid subscription name is - * specified. - * @since 1.1 - */ - void unsubscribe(String name) throws JMSException; -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java deleted file mode 100644 index 77f422e159..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.InputStream; -import java.io.OutputStream; - -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class ActiveMQInputStreamTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class); - - private static final String BROKER_URL = "tcp://localhost:0"; - private static final String DESTINATION = "destination"; - private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash - - private BrokerService broker; - private String connectionUri; - - @Override - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistent(false); - broker.setDestinations(new ActiveMQDestination[] { - ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE), - }); - broker.addConnector(BROKER_URL); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - public void testInputStreamSetSyncSendOption() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true"); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - - assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend()); - - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } - - public void testInputStreamMatchesDefaultChuckSize() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java deleted file mode 100644 index d624d368c5..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Destination; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author rnewson - */ -public final class LargeStreamletTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class); - private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; - private static final int BUFFER_SIZE = 1 * 1024; - private static final int MESSAGE_COUNT = 10 * 1024; - - protected Exception writerException; - protected Exception readerException; - - private final AtomicInteger totalRead = new AtomicInteger(); - private final AtomicInteger totalWritten = new AtomicInteger(); - private final AtomicBoolean stopThreads = new AtomicBoolean(false); - - public void testStreamlets() throws Exception { - final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); - - final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection(); - connection.start(); - try { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - final Destination destination = session.createQueue("wibble"); - final Thread readerThread = new Thread(new Runnable() { - - @Override - public void run() { - totalRead.set(0); - try { - final InputStream inputStream = connection.createInputStream(destination); - try { - int read; - final byte[] buf = new byte[BUFFER_SIZE]; - while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) { - totalRead.addAndGet(read); - } - } finally { - inputStream.close(); - } - } catch (Exception e) { - readerException = e; - e.printStackTrace(); - } finally { - LOG.info(totalRead + " total bytes read."); - } - } - }); - - final Thread writerThread = new Thread(new Runnable() { - private final Random random = new Random(); - - @Override - public void run() { - totalWritten.set(0); - int count = MESSAGE_COUNT; - try { - final OutputStream outputStream = connection.createOutputStream(destination); - try { - final byte[] buf = new byte[BUFFER_SIZE]; - random.nextBytes(buf); - while (count > 0 && !stopThreads.get()) { - outputStream.write(buf); - totalWritten.addAndGet(buf.length); - count--; - } - } finally { - outputStream.close(); - } - } catch (Exception e) { - writerException = e; - e.printStackTrace(); - } finally { - LOG.info(totalWritten + " total bytes written."); - } - } - }); - - readerThread.start(); - writerThread.start(); - - // Wait till reader is has finished receiving all the messages - // or he has stopped - // receiving messages. - Thread.sleep(1000); - int lastRead = totalRead.get(); - while (readerThread.isAlive()) { - readerThread.join(1000); - // No progress?? then stop waiting.. - if (lastRead == totalRead.get()) { - break; - } - lastRead = totalRead.get(); - } - - stopThreads.set(true); - - assertTrue("Should not have received a reader exception", readerException == null); - assertTrue("Should not have received a writer exception", writerException == null); - - assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get()); - - } finally { - session.close(); - } - } finally { - connection.close(); - } - } - -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java deleted file mode 100755 index f3926628f1..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.streams; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQInputStream; -import org.apache.activemq.ActiveMQOutputStream; -import org.apache.activemq.JmsTestSupport; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; - -/** - * JMSInputStreamTest - */ -@Deprecated -public class JMSInputStreamTest extends JmsTestSupport { - - public Destination destination; - protected DataOutputStream out; - protected DataInputStream in; - private ActiveMQConnection connection2; - - private ActiveMQInputStream amqIn; - private ActiveMQOutputStream amqOut; - - public static Test suite() { - return suite(JMSInputStreamTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - public void initCombos() { - addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") }); - } - - @Override - protected void setUp() throws Exception { - super.setAutoFail(true); - super.setUp(); - } - - private void setUpConnection(Map props, long timeout) throws JMSException { - connection2 = (ActiveMQConnection) factory.createConnection(userName, password); - connections.add(connection2); - if (props != null) { - amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } else { - amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination); - } - - out = new DataOutputStream(amqOut); - if (timeout == -1) { - amqIn = (ActiveMQInputStream) connection2.createInputStream(destination); - } else { - amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout); - } - in = new DataInputStream(amqIn); - } - - /* - * @see TestCase#tearDown() - */ - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - /** - * Test for AMQ-3010 - */ - public void testInputStreamTimeout() throws Exception { - long timeout = 500; - - setUpConnection(null, timeout); - try { - in.read(); - fail(); - } catch (ActiveMQInputStream.ReadTimeoutException e) { - // timeout reached, everything ok - } - in.close(); - } - - // Test for AMQ-2988 - public void testStreamsWithProperties() throws Exception { - String name1 = "PROPERTY_1"; - String name2 = "PROPERTY_2"; - String value1 = "VALUE_1"; - String value2 = "VALUE_2"; - Map jmsProperties = new HashMap(); - jmsProperties.put(name1, value1); - jmsProperties.put(name2, value2); - setUpConnection(jmsProperties, -1); - - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - // check properties before we try to read the stream - checkProperties(jmsProperties); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - - // check again after read was done - checkProperties(jmsProperties); - } - - public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception { - String name1 = "PROPERTY_1"; - String name2 = "PROPERTY_2"; - String value1 = "VALUE_1"; - String value2 = "VALUE_2"; - Map jmsProperties = new HashMap(); - jmsProperties.put(name1, value1); - jmsProperties.put(name2, value2); - - ActiveMQDestination dest = (ActiveMQDestination) destination; - - if (dest.isQueue()) { - destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); - } else { - destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); - } - - setUpConnection(jmsProperties, -1); - - assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly()); - - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - // check properties before we try to read the stream - checkProperties(jmsProperties); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - - // check again after read was done - checkProperties(jmsProperties); - } - - // check if the received stream has the properties set - // Test for AMQ-2988 - private void checkProperties(Map jmsProperties) throws IOException { - Map receivedJmsProps = amqIn.getJMSProperties(); - - // we should at least have the same amount or more properties - assertTrue(jmsProperties.size() <= receivedJmsProps.size()); - - // check the properties to see if we have everything in there - Iterator propsIt = jmsProperties.keySet().iterator(); - while (propsIt.hasNext()) { - String key = propsIt.next(); - assertTrue(receivedJmsProps.containsKey(key)); - assertEquals(jmsProperties.get(key), receivedJmsProps.get(key)); - } - } - - public void testLarge() throws Exception { - setUpConnection(null, -1); - - final int testData = 23; - final int dataLength = 4096; - final int count = 1024; - byte[] data = new byte[dataLength]; - for (int i = 0; i < data.length; i++) { - data[i] = testData; - } - final AtomicBoolean complete = new AtomicBoolean(false); - Thread runner = new Thread(new Runnable() { - @Override - public void run() { - try { - for (int x = 0; x < count; x++) { - byte[] b = new byte[2048]; - in.readFully(b); - for (int i = 0; i < b.length; i++) { - assertTrue(b[i] == testData); - } - } - complete.set(true); - synchronized (complete) { - complete.notify(); - } - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); - runner.start(); - for (int i = 0; i < count; i++) { - out.write(data); - } - out.flush(); - synchronized (complete) { - if (!complete.get()) { - complete.wait(30000); - } - } - assertTrue(complete.get()); - } - - public void testStreams() throws Exception { - setUpConnection(null, -1); - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); - - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); - } - } -}