Add a configuration option to control whether properties are set on every output message or only the first one. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429909 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-07 17:22:30 +00:00
parent 3cd8da80a6
commit b79c9868ec
3 changed files with 128 additions and 44 deletions

View File

@ -21,9 +21,11 @@ import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -39,7 +41,7 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
/**
*
*
*/
public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
@ -57,7 +59,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
private ProducerId producerId;
private long nextSequenceId;
private long timeout;
private final long timeout;
private boolean firstReceived;
public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
@ -86,7 +88,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
this.timeout = timeout;
this.info = new ConsumerInfo(consumerId);
this.info.setSubscriptionName(name);
@ -155,6 +157,21 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return jmsProperties;
}
/**
* This method allows the client to receive the Stream data as unaltered ActiveMQMessage
* object which is how the split stream data is sent. Each message will contains one
* chunk of the written bytes as well as a valid message group sequence id. The EOS
* message will have a message group sequence id of -1.
*
* This method is useful for testing, but should never be mixed with calls to the
* normal stream receive methods as it will break the normal stream processing flow
* and can lead to loss of data.
*
* @return an ActiveMQMessage object that either contains byte data or an end of strem
* marker.
* @throws JMSException
* @throws ReadTimeoutException
*/
public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
checkClosed();
MessageDispatch md;
@ -198,7 +215,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
/**
*
*
* @see InputStream#read()
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@ -211,9 +228,9 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return buffer[pos++] & 0xff;
}
/**
*
*
* @see InputStream#read(byte[], int, int)
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@ -285,6 +302,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
}
@Override
public void dispatch(MessageDispatch md) {
unconsumedMessages.enqueue(md);
}
@ -294,12 +312,12 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
}
/**
* Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
*
*/
public class ReadTimeoutException extends IOException {
private static final long serialVersionUID = -3217758894326719909L;
public ReadTimeoutException() {
super();
}

View File

@ -33,12 +33,13 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class ActiveMQOutputStream extends OutputStream implements Disposable {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
protected int count;
final byte buffer[];
@ -53,6 +54,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
private final int priority;
private final long timeToLive;
private boolean alwaysSyncSend = false;
private boolean addPropertiesOnFirstMsgOnly = false;
/**
* JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
@ -91,6 +93,15 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
Map<String, String> options = new HashMap<String, String>(destination.getOptions());
IntrospectionSupport.setProperties(this, options, "producer.");
IntrospectionSupport.setProperties(this.info, options, "producer.");
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " producer options that couldn't be set on the producer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This producer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}
this.info.setDestination(destination);
@ -99,6 +110,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
this.connection.asyncSendPacket(info);
}
@Override
public void close() throws IOException {
if (!closed) {
flushBuffer();
@ -113,6 +125,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
}
}
@Override
public void dispose() {
if (!closed) {
this.connection.removeOutputStream(this);
@ -120,6 +133,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
}
}
@Override
public synchronized void write(int b) throws IOException {
buffer[count++] = (byte) b;
if (count == buffer.length) {
@ -127,6 +141,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
}
}
@Override
public synchronized void write(byte b[], int off, int len) throws IOException {
while (len > 0) {
int max = Math.min(len, buffer.length - count);
@ -142,6 +157,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
}
}
@Override
public synchronized void flush() throws IOException {
flushBuffer();
}
@ -164,7 +180,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
* @throws JMSException
*/
private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
if (properties != null) {
if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
String key = iter.next();
Object value = properties.get(key);
@ -182,6 +198,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
}
@Override
public String toString() {
return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
}
@ -194,4 +211,11 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable {
this.alwaysSyncSend = alwaysSyncSend;
}
public boolean isAddPropertiesOnFirstMsgOnly() {
return addPropertiesOnFirstMsgOnly;
}
public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.streams;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -30,9 +29,12 @@ import javax.jms.JMSException;
import javax.jms.Message;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.ActiveMQOutputStream;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -47,8 +49,8 @@ public class JMSInputStreamTest extends JmsTestSupport {
private ActiveMQConnection connection2;
private ActiveMQInputStream amqIn;
private ActiveMQOutputStream amqOut;
public static Test suite() {
return suite(JMSInputStreamTest.class);
}
@ -58,32 +60,24 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
public void initCombos() {
addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC")});
addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") });
}
/*
* @see TestCase#setUp()
*/
@Override
protected void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
}
/**
* Setup connection and streams
*
* @param props
* @throws JMSException
*/
private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
connections.add(connection2);
OutputStream amqOut;
if (props != null) {
amqOut = connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
} else {
amqOut = connection.createOutputStream(destination);
amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
}
out = new DataOutputStream(amqOut);
if (timeout == -1) {
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
@ -92,9 +86,11 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
in = new DataInputStream(amqIn);
}
/*
* @see TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
@ -104,7 +100,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
*/
public void testInputStreamTimeout() throws Exception {
long timeout = 500;
setUpConnection(null, timeout);
try {
in.read();
@ -113,7 +109,6 @@ public class JMSInputStreamTest extends JmsTestSupport {
// timeout reached, everything ok
}
in.close();
}
// Test for AMQ-2988
@ -122,11 +117,11 @@ public class JMSInputStreamTest extends JmsTestSupport {
String name2 = "PROPERTY_2";
String value1 = "VALUE_1";
String value2 = "VALUE_2";
Map<String,Object> jmsProperties = new HashMap<String, Object>();
Map<String, Object> jmsProperties = new HashMap<String, Object>();
jmsProperties.put(name1, value1);
jmsProperties.put(name2, value2);
setUpConnection(jmsProperties, -1);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
@ -141,39 +136,85 @@ public class JMSInputStreamTest extends JmsTestSupport {
out.writeLong(i);
}
out.flush();
// check properties before we try to read the stream
checkProperties(jmsProperties);
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
// check again after read was done
checkProperties(jmsProperties);
}
public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
String name1 = "PROPERTY_1";
String name2 = "PROPERTY_2";
String value1 = "VALUE_1";
String value2 = "VALUE_2";
Map<String, Object> jmsProperties = new HashMap<String, Object>();
jmsProperties.put(name1, value1);
jmsProperties.put(name2, value2);
ActiveMQDestination dest = (ActiveMQDestination) destination;
if (dest.isQueue()) {
destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
} else {
destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
}
setUpConnection(jmsProperties, -1);
assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
// check properties before we try to read the stream
checkProperties(jmsProperties);
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
// check again after read was done
checkProperties(jmsProperties);
}
// check if the received stream has the properties set
// Test for AMQ-2988
private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
// we should at least have the same amount or more properties
assertTrue(jmsProperties.size() <= receivedJmsProps.size());
// check the properties to see if we have everything in there
Iterator<String> propsIt = jmsProperties.keySet().iterator();
while(propsIt.hasNext()) {
Iterator<String> propsIt = jmsProperties.keySet().iterator();
while (propsIt.hasNext()) {
String key = propsIt.next();
assertTrue(receivedJmsProps.containsKey(key));
assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
}
}
public void testLarge() throws Exception {
setUpConnection(null, -1);
final int testData = 23;
final int dataLength = 4096;
final int count = 1024;
@ -183,6 +224,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
final AtomicBoolean complete = new AtomicBoolean(false);
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int x = 0; x < count; x++) {
@ -213,7 +255,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
assertTrue(complete.get());
}
public void testStreams() throws Exception {
setUpConnection(null, -1);
out.writeInt(4);
@ -230,7 +272,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
out.writeLong(i);
}
out.flush();
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}