Remove the deprecated JMS Streams code.
This commit is contained in:
Timothy Bish 2015-02-18 15:47:40 -05:00
parent e6597c4604
commit 8858dc294c
7 changed files with 1 additions and 1365 deletions

View File

@ -17,8 +17,6 @@
package org.apache.activemq; package org.apache.activemq;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
@ -38,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData; import javax.jms.ConnectionMetaData;
import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
@ -108,7 +105,7 @@ import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
@ -173,9 +170,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
// Stream are deprecated and will be removed in a later release.
private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
// Maps ConsumerIds to ActiveMQConsumer objects // Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
@ -183,7 +177,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId; private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
@ -681,15 +674,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQConnectionConsumer c = i.next(); ActiveMQConnectionConsumer c = i.next();
c.dispose(); c.dispose();
} }
// Stream are deprecated and will be removed in a later release.
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
this.activeTempDestinations.clear(); this.activeTempDestinations.clear();
@ -1267,13 +1251,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
} }
/**
* @return
*/
private ProducerId createProducerId() {
return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
}
/** /**
* Creates a <CODE>QueueSession</CODE> object. * Creates a <CODE>QueueSession</CODE> object.
* *
@ -1610,16 +1587,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
c.dispose(); c.dispose();
} }
// Stream are deprecated and will be removed in a later release.
for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = i.next();
c.dispose();
}
for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = i.next();
c.dispose();
}
if (isConnectionInfoSentToBroker) { if (isConnectionInfoSentToBroker) {
if (!transportFailed.get() && !closing.get()) { if (!transportFailed.get() && !closing.get()) {
syncSendPacket(info.createRemoveCommand()); syncSendPacket(info.createRemoveCommand());
@ -2199,100 +2166,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.objectMessageSerializationDefered = objectMessageSerializationDefered; this.objectMessageSerializationDefered = objectMessageSerializationDefered;
} }
@Override
@Deprecated
public InputStream createInputStream(Destination dest) throws JMSException {
return createInputStream(dest, null);
}
@Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
return createInputStream(dest, messageSelector, false);
}
@Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
return createInputStream(dest, messageSelector, noLocal, -1);
}
@Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
@Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false);
}
@Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, false);
}
@Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
}
@Override
@Deprecated
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
}
@Deprecated
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
}
/**
* Creates a persistent output stream; individual messages will be written
* to disk/database by the broker
*/
@Override
@Deprecated
public OutputStream createOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
}
/**
* Creates a non persistent output stream; messages will not be written to
* disk
*/
@Deprecated
public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
}
/**
* Creates an output stream allowing full control over the delivery mode,
* the priority and time to live of the messages and the properties added to
* messages on the stream.
*
* @param streamProperties defines a map of key-value pairs where the keys
* are strings and the values are primitive values (numbers
* and strings) which are appended to the messages similarly
* to using the
* {@link javax.jms.Message#setObjectProperty(String, Object)}
* method
*/
@Override
@Deprecated
public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
}
/** /**
* Unsubscribes a durable subscription that has been created by a client. * Unsubscribes a durable subscription that has been created by a client.
* <P> * <P>
@ -2312,7 +2185,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* specified. * specified.
* @since 1.1 * @since 1.1
*/ */
@Override
public void unsubscribe(String name) throws InvalidDestinationException, JMSException { public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
@ -2364,26 +2236,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
@Deprecated
public void addOutputStream(ActiveMQOutputStream stream) {
outputStreams.add(stream);
}
@Deprecated
public void removeOutputStream(ActiveMQOutputStream stream) {
outputStreams.remove(stream);
}
@Deprecated
public void addInputStream(ActiveMQInputStream stream) {
inputStreams.add(stream);
}
@Deprecated
public void removeInputStream(ActiveMQInputStream stream) {
inputStreams.remove(stream);
}
protected void onControlCommand(ControlCommand command) { protected void onControlCommand(ControlCommand command) {
String text = command.getCommand(); String text = command.getCommand();
if (text != null) { if (text != null) {

View File

@ -1,326 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
/**
*
*/
@Deprecated
public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
private final ActiveMQConnection connection;
private final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
private int deliveredCounter;
private MessageDispatch lastDelivered;
private boolean eosReached;
private byte buffer[];
private int pos;
private Map<String, Object> jmsProperties;
private ProducerId producerId;
private long nextSequenceId;
private final long timeout;
private boolean firstReceived;
public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
throws JMSException {
this.connection = connection;
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.isTemporary()) {
String physicalName = dest.getPhysicalName();
if (physicalName == null) {
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}
String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}
if (connection.isDeleted(dest)) {
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
}
if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
this.timeout = timeout;
this.info = new ConsumerInfo(consumerId);
this.info.setSubscriptionName(name);
if (selector != null && selector.trim().length() != 0) {
selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
} else {
selector = "JMSType='org.apache.activemq.Stream'";
}
SelectorParser.parse(selector);
this.info.setSelector(selector);
this.info.setPrefetchSize(prefetch);
this.info.setNoLocal(noLocal);
this.info.setBrowser(false);
this.info.setDispatchAsync(false);
// Allows the options on the destination to configure the consumerInfo
if (dest.getOptions() != null) {
Map<String, String> options = new HashMap<String, String>(dest.getOptions());
IntrospectionSupport.setProperties(this.info, options, "consumer.");
}
this.info.setDestination(dest);
this.connection.addInputStream(this);
this.connection.addDispatcher(info.getConsumerId(), this);
this.connection.syncSendPacket(info);
unconsumedMessages.start();
}
@Override
public void close() throws IOException {
if (!unconsumedMessages.isClosed()) {
try {
if (lastDelivered != null) {
MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
connection.asyncSendPacket(ack);
}
dispose();
this.connection.syncSendPacket(info.createRemoveCommand());
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
}
public void dispose() {
if (!unconsumedMessages.isClosed()) {
unconsumedMessages.close();
this.connection.removeDispatcher(info.getConsumerId());
this.connection.removeInputStream(this);
}
}
/**
* Return the JMS Properties which where used to send the InputStream
*
* @return jmsProperties
* @throws IOException
*/
public Map<String, Object> getJMSProperties() throws IOException {
if (jmsProperties == null) {
fillBuffer();
}
return jmsProperties;
}
/**
* This method allows the client to receive the Stream data as unaltered ActiveMQMessage
* object which is how the split stream data is sent. Each message will contains one
* chunk of the written bytes as well as a valid message group sequence id. The EOS
* message will have a message group sequence id of -1.
*
* This method is useful for testing, but should never be mixed with calls to the
* normal stream receive methods as it will break the normal stream processing flow
* and can lead to loss of data.
*
* @return an ActiveMQMessage object that either contains byte data or an end of strem
* marker.
* @throws JMSException
* @throws ReadTimeoutException
*/
public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
checkClosed();
MessageDispatch md;
try {
if (firstReceived || timeout == -1) {
md = unconsumedMessages.dequeue(-1);
firstReceived = true;
} else {
md = unconsumedMessages.dequeue(timeout);
if (md == null) throw new ReadTimeoutException();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e);
}
if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
return null;
}
deliveredCounter++;
if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
connection.asyncSendPacket(ack);
deliveredCounter = 0;
lastDelivered = null;
} else {
lastDelivered = md;
}
return (ActiveMQMessage)md.getMessage();
}
/**
* @throws IllegalStateException
*/
protected void checkClosed() throws IllegalStateException {
if (unconsumedMessages.isClosed()) {
throw new IllegalStateException("The Consumer is closed");
}
}
/**
*
* @see InputStream#read()
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@Override
public int read() throws IOException {
fillBuffer();
if (eosReached || buffer.length == 0) {
return -1;
}
return buffer[pos++] & 0xff;
}
/**
*
* @see InputStream#read(byte[], int, int)
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
fillBuffer();
if (eosReached || buffer.length == 0) {
return -1;
}
int max = Math.min(len, buffer.length - pos);
System.arraycopy(buffer, pos, b, off, max);
pos += max;
return max;
}
private void fillBuffer() throws IOException {
if (eosReached || (buffer != null && buffer.length > pos)) {
return;
}
try {
while (true) {
ActiveMQMessage m = receive();
if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
// First message.
long producerSequenceId = m.getMessageId().getProducerSequenceId();
if (producerId == null) {
// We have to start a stream at sequence id = 0
if (producerSequenceId != 0) {
continue;
}
nextSequenceId++;
producerId = m.getMessageId().getProducerId();
} else {
// Verify it's the next message of the sequence.
if (!m.getMessageId().getProducerId().equals(producerId)) {
throw new IOException("Received an unexpected message: invalid producer: " + m);
}
if (producerSequenceId != nextSequenceId++) {
throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
}
}
// Read the buffer in.
ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
buffer = new byte[(int)bm.getBodyLength()];
bm.readBytes(buffer);
pos = 0;
if (jmsProperties == null) {
jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
}
} else {
eosReached = true;
if (jmsProperties == null) {
// no properties found
jmsProperties = Collections.emptyMap();
}
}
return;
}
} catch (JMSException e) {
eosReached = true;
if (jmsProperties == null) {
// no properties found
jmsProperties = Collections.emptyMap();
}
throw IOExceptionSupport.create(e);
}
}
@Override
public void dispatch(MessageDispatch md) {
unconsumedMessages.enqueue(md);
}
@Override
public String toString() {
return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
}
/**
* Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
*/
public class ReadTimeoutException extends IOException {
private static final long serialVersionUID = -3217758894326719909L;
public ReadTimeoutException() {
super();
}
}
}

View File

@ -1,222 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Deprecated
public class ActiveMQOutputStream extends OutputStream implements Disposable {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
protected int count;
final byte buffer[];
private final ActiveMQConnection connection;
private final Map<String, Object> properties;
private final ProducerInfo info;
private long messageSequence;
private boolean closed;
private final int deliveryMode;
private final int priority;
private final long timeToLive;
private boolean alwaysSyncSend = false;
private boolean addPropertiesOnFirstMsgOnly = false;
/**
* JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
*/
public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
long timeToLive) throws JMSException {
this.connection = connection;
this.deliveryMode = deliveryMode;
this.priority = priority;
this.timeToLive = timeToLive;
this.properties = properties == null ? null : new HashMap<String, Object>(properties);
Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
if (chunkSize == null) {
chunkSize = 64 * 1024;
} else {
if (chunkSize < 1) {
throw new IllegalArgumentException("Chunk size must be greater then 0");
} else {
chunkSize *= 1024;
}
}
buffer = new byte[chunkSize];
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
this.info = new ProducerInfo(producerId);
// Allows the options on the destination to configure the stream
if (destination.getOptions() != null) {
Map<String, String> options = new HashMap<String, String>(destination.getOptions());
IntrospectionSupport.setProperties(this, options, "producer.");
IntrospectionSupport.setProperties(this.info, options, "producer.");
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " producer options that couldn't be set on the producer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This producer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}
this.info.setDestination(destination);
this.connection.addOutputStream(this);
this.connection.asyncSendPacket(info);
}
@Override
public void close() throws IOException {
if (!closed) {
flushBuffer();
try {
// Send an EOS style empty message to signal EOS.
send(new ActiveMQMessage(), true);
dispose();
this.connection.asyncSendPacket(info.createRemoveCommand());
} catch (JMSException e) {
IOExceptionSupport.create(e);
}
}
}
@Override
public void dispose() {
if (!closed) {
this.connection.removeOutputStream(this);
closed = true;
}
}
@Override
public synchronized void write(int b) throws IOException {
buffer[count++] = (byte) b;
if (count == buffer.length) {
flushBuffer();
}
}
@Override
public synchronized void write(byte b[], int off, int len) throws IOException {
while (len > 0) {
int max = Math.min(len, buffer.length - count);
System.arraycopy(b, off, buffer, count, max);
len -= max;
count += max;
off += max;
if (count == buffer.length) {
flushBuffer();
}
}
}
@Override
public synchronized void flush() throws IOException {
flushBuffer();
}
private void flushBuffer() throws IOException {
if (count != 0) {
try {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
msg.writeBytes(buffer, 0, count);
send(msg, false);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
count = 0;
}
}
/**
* @param msg
* @throws JMSException
*/
private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
String key = iter.next();
Object value = properties.get(key);
msg.setObjectProperty(key, value);
}
}
msg.setType("org.apache.activemq.Stream");
msg.setGroupID(info.getProducerId().toString());
if (eosMessage) {
msg.setGroupSequence(-1);
} else {
msg.setGroupSequence((int) messageSequence);
}
MessageId id = new MessageId(info.getProducerId(), messageSequence++);
connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
}
@Override
public String toString() {
return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
}
public boolean isAlwaysSyncSend() {
return alwaysSyncSend;
}
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
public boolean isAddPropertiesOnFirstMsgOnly() {
return addPropertiesOnFirstMsgOnly;
}
public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
}
}

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
/**
* The StreamConnection interface allows you to send and receive data from a
* Destination in using standard java InputStream and OutputStream objects. It's
* best use case is to send and receive large amounts of data that would be to
* large to hold in a single JMS message.
*
*
*/
@Deprecated
public interface StreamConnection extends Connection {
InputStream createInputStream(Destination dest) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException;
OutputStream createOutputStream(Destination dest) throws JMSException;
OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException;
/**
* Unsubscribes a durable subscription that has been created by a client.
* <P>
* This method deletes the state being maintained on behalf of the
* subscriber by its provider.
* <P>
* It is erroneous for a client to delete a durable subscription while there
* is an active <CODE>MessageConsumer </CODE> or
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
* message is part of a pending transaction or has not been acknowledged in
* the session.
*
* @param name the name used to identify this subscription
* @throws JMSException if the session fails to unsubscribe to the durable
* subscription due to some internal error.
* @throws InvalidDestinationException if an invalid subscription name is
* specified.
* @since 1.1
*/
void unsubscribe(String name) throws JMSException;
}

View File

@ -1,145 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Deprecated
public class ActiveMQInputStreamTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
private static final String BROKER_URL = "tcp://localhost:0";
private static final String DESTINATION = "destination";
private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
private BrokerService broker;
private String connectionUri;
@Override
public void setUp() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistent(false);
broker.setDestinations(new ActiveMQDestination[] {
ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),
});
broker.addConnector(BROKER_URL);
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
@Override
public void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public void testInputStreamSetSyncSendOption() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
OutputStream out = null;
try {
out = connection.createOutputStream(destination);
assertTrue(((ActiveMQOutputStream)out).isAlwaysSyncSend());
LOG.debug("writing...");
for (int i = 0; i < STREAM_LENGTH; ++i) {
out.write(0);
}
LOG.debug("wrote " + STREAM_LENGTH + " bytes");
} finally {
if (out != null) {
out.close();
}
}
InputStream in = null;
try {
in = connection.createInputStream(destination);
LOG.debug("reading...");
int count = 0;
while (-1 != in.read()) {
++count;
}
LOG.debug("read " + count + " bytes");
} finally {
if (in != null) {
in.close();
}
}
connection.close();
}
public void testInputStreamMatchesDefaultChuckSize() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(DESTINATION);
OutputStream out = null;
try {
out = connection.createOutputStream(destination);
LOG.debug("writing...");
for (int i = 0; i < STREAM_LENGTH; ++i) {
out.write(0);
}
LOG.debug("wrote " + STREAM_LENGTH + " bytes");
} finally {
if (out != null) {
out.close();
}
}
InputStream in = null;
try {
in = connection.createInputStream(destination);
LOG.debug("reading...");
int count = 0;
while (-1 != in.read()) {
++count;
}
LOG.debug("read " + count + " bytes");
} finally {
if (in != null) {
in.close();
}
}
connection.close();
}
}

View File

@ -1,162 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.Session;
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author rnewson
*/
public final class LargeStreamletTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
private static final int BUFFER_SIZE = 1 * 1024;
private static final int MESSAGE_COUNT = 10 * 1024;
protected Exception writerException;
protected Exception readerException;
private final AtomicInteger totalRead = new AtomicInteger();
private final AtomicInteger totalWritten = new AtomicInteger();
private final AtomicBoolean stopThreads = new AtomicBoolean(false);
public void testStreamlets() throws Exception {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
connection.start();
try {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
final Destination destination = session.createQueue("wibble");
final Thread readerThread = new Thread(new Runnable() {
@Override
public void run() {
totalRead.set(0);
try {
final InputStream inputStream = connection.createInputStream(destination);
try {
int read;
final byte[] buf = new byte[BUFFER_SIZE];
while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) {
totalRead.addAndGet(read);
}
} finally {
inputStream.close();
}
} catch (Exception e) {
readerException = e;
e.printStackTrace();
} finally {
LOG.info(totalRead + " total bytes read.");
}
}
});
final Thread writerThread = new Thread(new Runnable() {
private final Random random = new Random();
@Override
public void run() {
totalWritten.set(0);
int count = MESSAGE_COUNT;
try {
final OutputStream outputStream = connection.createOutputStream(destination);
try {
final byte[] buf = new byte[BUFFER_SIZE];
random.nextBytes(buf);
while (count > 0 && !stopThreads.get()) {
outputStream.write(buf);
totalWritten.addAndGet(buf.length);
count--;
}
} finally {
outputStream.close();
}
} catch (Exception e) {
writerException = e;
e.printStackTrace();
} finally {
LOG.info(totalWritten + " total bytes written.");
}
}
});
readerThread.start();
writerThread.start();
// Wait till reader is has finished receiving all the messages
// or he has stopped
// receiving messages.
Thread.sleep(1000);
int lastRead = totalRead.get();
while (readerThread.isAlive()) {
readerThread.join(1000);
// No progress?? then stop waiting..
if (lastRead == totalRead.get()) {
break;
}
lastRead = totalRead.get();
}
stopThreads.set(true);
assertTrue("Should not have received a reader exception", readerException == null);
assertTrue("Should not have received a writer exception", writerException == null);
assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get());
} finally {
session.close();
}
} finally {
connection.close();
}
}
}

View File

@ -1,281 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.streams;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.ActiveMQOutputStream;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* JMSInputStreamTest
*/
@Deprecated
public class JMSInputStreamTest extends JmsTestSupport {
public Destination destination;
protected DataOutputStream out;
protected DataInputStream in;
private ActiveMQConnection connection2;
private ActiveMQInputStream amqIn;
private ActiveMQOutputStream amqOut;
public static Test suite() {
return suite(JMSInputStreamTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
public void initCombos() {
addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") });
}
@Override
protected void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
}
private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
connections.add(connection2);
if (props != null) {
amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
} else {
amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
}
out = new DataOutputStream(amqOut);
if (timeout == -1) {
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
} else {
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
}
in = new DataInputStream(amqIn);
}
/*
* @see TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
/**
* Test for AMQ-3010
*/
public void testInputStreamTimeout() throws Exception {
long timeout = 500;
setUpConnection(null, timeout);
try {
in.read();
fail();
} catch (ActiveMQInputStream.ReadTimeoutException e) {
// timeout reached, everything ok
}
in.close();
}
// Test for AMQ-2988
public void testStreamsWithProperties() throws Exception {
String name1 = "PROPERTY_1";
String name2 = "PROPERTY_2";
String value1 = "VALUE_1";
String value2 = "VALUE_2";
Map<String, Object> jmsProperties = new HashMap<String, Object>();
jmsProperties.put(name1, value1);
jmsProperties.put(name2, value2);
setUpConnection(jmsProperties, -1);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
// check properties before we try to read the stream
checkProperties(jmsProperties);
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
// check again after read was done
checkProperties(jmsProperties);
}
public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
String name1 = "PROPERTY_1";
String name2 = "PROPERTY_2";
String value1 = "VALUE_1";
String value2 = "VALUE_2";
Map<String, Object> jmsProperties = new HashMap<String, Object>();
jmsProperties.put(name1, value1);
jmsProperties.put(name2, value2);
ActiveMQDestination dest = (ActiveMQDestination) destination;
if (dest.isQueue()) {
destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
} else {
destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
}
setUpConnection(jmsProperties, -1);
assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
// check properties before we try to read the stream
checkProperties(jmsProperties);
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
// check again after read was done
checkProperties(jmsProperties);
}
// check if the received stream has the properties set
// Test for AMQ-2988
private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
// we should at least have the same amount or more properties
assertTrue(jmsProperties.size() <= receivedJmsProps.size());
// check the properties to see if we have everything in there
Iterator<String> propsIt = jmsProperties.keySet().iterator();
while (propsIt.hasNext()) {
String key = propsIt.next();
assertTrue(receivedJmsProps.containsKey(key));
assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
}
}
public void testLarge() throws Exception {
setUpConnection(null, -1);
final int testData = 23;
final int dataLength = 4096;
final int count = 1024;
byte[] data = new byte[dataLength];
for (int i = 0; i < data.length; i++) {
data[i] = testData;
}
final AtomicBoolean complete = new AtomicBoolean(false);
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int x = 0; x < count; x++) {
byte[] b = new byte[2048];
in.readFully(b);
for (int i = 0; i < b.length; i++) {
assertTrue(b[i] == testData);
}
}
complete.set(true);
synchronized (complete) {
complete.notify();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
runner.start();
for (int i = 0; i < count; i++) {
out.write(data);
}
out.flush();
synchronized (complete) {
if (!complete.get()) {
complete.wait(30000);
}
}
assertTrue(complete.get());
}
public void testStreams() throws Exception {
setUpConnection(null, -1);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
}
}