From b79c9868ece707c83081797de8a5d8151434b90a Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 7 Jan 2013 17:22:30 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4241 Add a configuration option to control whether properties are set on every output message or only the first one. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429909 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQInputStream.java | 34 ++++-- .../apache/activemq/ActiveMQOutputStream.java | 32 +++++- .../activemq/streams/JMSInputStreamTest.java | 106 ++++++++++++------ 3 files changed, 128 insertions(+), 44 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java index 22ee6f7c3f..4937431248 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java @@ -21,9 +21,11 @@ import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; + import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -39,7 +41,7 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; /** - * + * */ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { @@ -57,7 +59,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch private ProducerId producerId; private long nextSequenceId; - private long timeout; + private final long timeout; private boolean firstReceived; public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout) @@ -86,7 +88,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1"); this.timeout = timeout; - + this.info = new ConsumerInfo(consumerId); this.info.setSubscriptionName(name); @@ -155,6 +157,21 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch return jmsProperties; } + /** + * This method allows the client to receive the Stream data as unaltered ActiveMQMessage + * object which is how the split stream data is sent. Each message will contains one + * chunk of the written bytes as well as a valid message group sequence id. The EOS + * message will have a message group sequence id of -1. + * + * This method is useful for testing, but should never be mixed with calls to the + * normal stream receive methods as it will break the normal stream processing flow + * and can lead to loss of data. + * + * @return an ActiveMQMessage object that either contains byte data or an end of strem + * marker. + * @throws JMSException + * @throws ReadTimeoutException + */ public ActiveMQMessage receive() throws JMSException, ReadTimeoutException { checkClosed(); MessageDispatch md; @@ -198,7 +215,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch } /** - * + * * @see InputStream#read() * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout */ @@ -211,9 +228,9 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch return buffer[pos++] & 0xff; } - + /** - * + * * @see InputStream#read(byte[], int, int) * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout */ @@ -285,6 +302,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch } } + @Override public void dispatch(MessageDispatch md) { unconsumedMessages.enqueue(md); } @@ -294,12 +312,12 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; } - /** * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout - * */ public class ReadTimeoutException extends IOException { + private static final long serialVersionUID = -3217758894326719909L; + public ReadTimeoutException() { super(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java index 0268f95064..0427c9dfbd 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java @@ -33,12 +33,13 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * - */ public class ActiveMQOutputStream extends OutputStream implements Disposable { + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class); + protected int count; final byte buffer[]; @@ -53,6 +54,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { private final int priority; private final long timeToLive; private boolean alwaysSyncSend = false; + private boolean addPropertiesOnFirstMsgOnly = false; /** * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb @@ -91,6 +93,15 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { Map options = new HashMap(destination.getOptions()); IntrospectionSupport.setProperties(this, options, "producer."); IntrospectionSupport.setProperties(this.info, options, "producer."); + if (options.size() > 0) { + String msg = "There are " + options.size() + + " producer options that couldn't be set on the producer." + + " Check the options are spelled correctly." + + " Unknown parameters=[" + options + "]." + + " This producer cannot be started."; + LOG.warn(msg); + throw new ConfigurationException(msg); + } } this.info.setDestination(destination); @@ -99,6 +110,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { this.connection.asyncSendPacket(info); } + @Override public void close() throws IOException { if (!closed) { flushBuffer(); @@ -113,6 +125,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { } } + @Override public void dispose() { if (!closed) { this.connection.removeOutputStream(this); @@ -120,6 +133,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { } } + @Override public synchronized void write(int b) throws IOException { buffer[count++] = (byte) b; if (count == buffer.length) { @@ -127,6 +141,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { } } + @Override public synchronized void write(byte b[], int off, int len) throws IOException { while (len > 0) { int max = Math.min(len, buffer.length - count); @@ -142,6 +157,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { } } + @Override public synchronized void flush() throws IOException { flushBuffer(); } @@ -164,7 +180,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { * @throws JMSException */ private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { - if (properties != null) { + if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) { for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) { String key = iter.next(); Object value = properties.get(key); @@ -182,6 +198,7 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend()); } + @Override public String toString() { return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; } @@ -194,4 +211,11 @@ public class ActiveMQOutputStream extends OutputStream implements Disposable { this.alwaysSyncSend = alwaysSyncSend; } + public boolean isAddPropertiesOnFirstMsgOnly() { + return addPropertiesOnFirstMsgOnly; + } + + public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) { + this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java index bcc63baa18..25b514a834 100755 --- a/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java @@ -19,7 +19,6 @@ package org.apache.activemq.streams; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -30,9 +29,12 @@ import javax.jms.JMSException; import javax.jms.Message; import junit.framework.Test; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQInputStream; +import org.apache.activemq.ActiveMQOutputStream; import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -47,8 +49,8 @@ public class JMSInputStreamTest extends JmsTestSupport { private ActiveMQConnection connection2; private ActiveMQInputStream amqIn; + private ActiveMQOutputStream amqOut; - public static Test suite() { return suite(JMSInputStreamTest.class); } @@ -58,32 +60,24 @@ public class JMSInputStreamTest extends JmsTestSupport { } public void initCombos() { - addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC")}); + addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC") }); } - /* - * @see TestCase#setUp() - */ + @Override protected void setUp() throws Exception { super.setAutoFail(true); super.setUp(); } - /** - * Setup connection and streams - * - * @param props - * @throws JMSException - */ private void setUpConnection(Map props, long timeout) throws JMSException { - connection2 = (ActiveMQConnection)factory.createConnection(userName, password); + connection2 = (ActiveMQConnection) factory.createConnection(userName, password); connections.add(connection2); - OutputStream amqOut; if (props != null) { - amqOut = connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); } else { - amqOut = connection.createOutputStream(destination); + amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination); } + out = new DataOutputStream(amqOut); if (timeout == -1) { amqIn = (ActiveMQInputStream) connection2.createInputStream(destination); @@ -92,9 +86,11 @@ public class JMSInputStreamTest extends JmsTestSupport { } in = new DataInputStream(amqIn); } + /* * @see TestCase#tearDown() */ + @Override protected void tearDown() throws Exception { super.tearDown(); } @@ -104,7 +100,7 @@ public class JMSInputStreamTest extends JmsTestSupport { */ public void testInputStreamTimeout() throws Exception { long timeout = 500; - + setUpConnection(null, timeout); try { in.read(); @@ -113,7 +109,6 @@ public class JMSInputStreamTest extends JmsTestSupport { // timeout reached, everything ok } in.close(); - } // Test for AMQ-2988 @@ -122,11 +117,11 @@ public class JMSInputStreamTest extends JmsTestSupport { String name2 = "PROPERTY_2"; String value1 = "VALUE_1"; String value2 = "VALUE_2"; - Map jmsProperties = new HashMap(); + Map jmsProperties = new HashMap(); jmsProperties.put(name1, value1); jmsProperties.put(name2, value2); setUpConnection(jmsProperties, -1); - + out.writeInt(4); out.flush(); assertTrue(in.readInt() == 4); @@ -141,39 +136,85 @@ public class JMSInputStreamTest extends JmsTestSupport { out.writeLong(i); } out.flush(); - + // check properties before we try to read the stream checkProperties(jmsProperties); - + for (int i = 0; i < 100; i++) { assertTrue(in.readLong() == i); } - + // check again after read was done checkProperties(jmsProperties); } - + + public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception { + String name1 = "PROPERTY_1"; + String name2 = "PROPERTY_2"; + String value1 = "VALUE_1"; + String value2 = "VALUE_2"; + Map jmsProperties = new HashMap(); + jmsProperties.put(name1, value1); + jmsProperties.put(name2, value2); + + ActiveMQDestination dest = (ActiveMQDestination) destination; + + if (dest.isQueue()) { + destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); + } else { + destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true"); + } + + setUpConnection(jmsProperties, -1); + + assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly()); + + out.writeInt(4); + out.flush(); + assertTrue(in.readInt() == 4); + out.writeFloat(2.3f); + out.flush(); + assertTrue(in.readFloat() == 2.3f); + String str = "this is a test string"; + out.writeUTF(str); + out.flush(); + assertTrue(in.readUTF().equals(str)); + for (int i = 0; i < 100; i++) { + out.writeLong(i); + } + out.flush(); + + // check properties before we try to read the stream + checkProperties(jmsProperties); + + for (int i = 0; i < 100; i++) { + assertTrue(in.readLong() == i); + } + + // check again after read was done + checkProperties(jmsProperties); + } + // check if the received stream has the properties set // Test for AMQ-2988 private void checkProperties(Map jmsProperties) throws IOException { Map receivedJmsProps = amqIn.getJMSProperties(); - + // we should at least have the same amount or more properties assertTrue(jmsProperties.size() <= receivedJmsProps.size()); - // check the properties to see if we have everything in there - Iterator propsIt = jmsProperties.keySet().iterator(); - while(propsIt.hasNext()) { + Iterator propsIt = jmsProperties.keySet().iterator(); + while (propsIt.hasNext()) { String key = propsIt.next(); assertTrue(receivedJmsProps.containsKey(key)); assertEquals(jmsProperties.get(key), receivedJmsProps.get(key)); } } - + public void testLarge() throws Exception { setUpConnection(null, -1); - + final int testData = 23; final int dataLength = 4096; final int count = 1024; @@ -183,6 +224,7 @@ public class JMSInputStreamTest extends JmsTestSupport { } final AtomicBoolean complete = new AtomicBoolean(false); Thread runner = new Thread(new Runnable() { + @Override public void run() { try { for (int x = 0; x < count; x++) { @@ -213,7 +255,7 @@ public class JMSInputStreamTest extends JmsTestSupport { } assertTrue(complete.get()); } - + public void testStreams() throws Exception { setUpConnection(null, -1); out.writeInt(4); @@ -230,7 +272,7 @@ public class JMSInputStreamTest extends JmsTestSupport { out.writeLong(i); } out.flush(); - + for (int i = 0; i < 100; i++) { assertTrue(in.readLong() == i); }