mirror of https://github.com/apache/activemq.git
apply patch for: https://issues.apache.org/activemq/browse/AMQ-2988
with modifications to make the properties map unmodifiable and use the generic Collections.emptyMap method instead of creating a new empty map. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1027282 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fe88131fcd
commit
8262eec63d
|
@ -18,6 +18,7 @@ package org.apache.activemq;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
|
@ -52,6 +53,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
|
||||||
private boolean eosReached;
|
private boolean eosReached;
|
||||||
private byte buffer[];
|
private byte buffer[];
|
||||||
private int pos;
|
private int pos;
|
||||||
|
private Map<String, Object> jmsProperties;
|
||||||
|
|
||||||
private ProducerId producerId;
|
private ProducerId producerId;
|
||||||
private long nextSequenceId;
|
private long nextSequenceId;
|
||||||
|
@ -135,6 +137,19 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the JMS Properties which where used to send the InputStream
|
||||||
|
*
|
||||||
|
* @return jmsProperties
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getJMSProperties() throws IOException {
|
||||||
|
if (jmsProperties == null) {
|
||||||
|
fillBuffer();
|
||||||
|
}
|
||||||
|
return jmsProperties;
|
||||||
|
}
|
||||||
|
|
||||||
public ActiveMQMessage receive() throws JMSException {
|
public ActiveMQMessage receive() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
MessageDispatch md;
|
MessageDispatch md;
|
||||||
|
@ -227,13 +242,24 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
|
||||||
buffer = new byte[(int)bm.getBodyLength()];
|
buffer = new byte[(int)bm.getBodyLength()];
|
||||||
bm.readBytes(buffer);
|
bm.readBytes(buffer);
|
||||||
pos = 0;
|
pos = 0;
|
||||||
|
if (jmsProperties == null) {
|
||||||
|
jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
eosReached = true;
|
eosReached = true;
|
||||||
|
if (jmsProperties == null) {
|
||||||
|
// no properties found
|
||||||
|
jmsProperties = Collections.emptyMap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
eosReached = true;
|
eosReached = true;
|
||||||
|
if (jmsProperties == null) {
|
||||||
|
// no properties found
|
||||||
|
jmsProperties = Collections.emptyMap();
|
||||||
|
}
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,20 @@ package org.apache.activemq.streams;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
|
||||||
import junit.framework.Test;
|
import junit.framework.Test;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQInputStream;
|
||||||
import org.apache.activemq.JmsTestSupport;
|
import org.apache.activemq.JmsTestSupport;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -38,6 +46,8 @@ public class JMSInputStreamTest extends JmsTestSupport {
|
||||||
protected DataInputStream in;
|
protected DataInputStream in;
|
||||||
private ActiveMQConnection connection2;
|
private ActiveMQConnection connection2;
|
||||||
|
|
||||||
|
private ActiveMQInputStream amqIn;
|
||||||
|
|
||||||
|
|
||||||
public static Test suite() {
|
public static Test suite() {
|
||||||
return suite(JMSInputStreamTest.class);
|
return suite(JMSInputStreamTest.class);
|
||||||
|
@ -57,12 +67,27 @@ public class JMSInputStreamTest extends JmsTestSupport {
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
super.setAutoFail(true);
|
super.setAutoFail(true);
|
||||||
super.setUp();
|
super.setUp();
|
||||||
connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
|
|
||||||
connections.add(connection2);
|
|
||||||
out = new DataOutputStream(connection.createOutputStream(destination));
|
|
||||||
in = new DataInputStream(connection2.createInputStream(destination));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup connection and streams
|
||||||
|
*
|
||||||
|
* @param props
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
private void setUpConnection(Map<String, Object> props) throws JMSException {
|
||||||
|
connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
|
||||||
|
connections.add(connection2);
|
||||||
|
OutputStream amqOut;
|
||||||
|
if (props != null) {
|
||||||
|
amqOut = connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||||
|
} else {
|
||||||
|
amqOut = connection.createOutputStream(destination);
|
||||||
|
}
|
||||||
|
out = new DataOutputStream(amqOut);
|
||||||
|
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
|
||||||
|
in = new DataInputStream(amqIn);
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
* @see TestCase#tearDown()
|
* @see TestCase#tearDown()
|
||||||
*/
|
*/
|
||||||
|
@ -71,6 +96,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testStreams() throws Exception {
|
public void testStreams() throws Exception {
|
||||||
|
setUpConnection(null);
|
||||||
out.writeInt(4);
|
out.writeInt(4);
|
||||||
out.flush();
|
out.flush();
|
||||||
assertTrue(in.readInt() == 4);
|
assertTrue(in.readInt() == 4);
|
||||||
|
@ -85,12 +111,70 @@ public class JMSInputStreamTest extends JmsTestSupport {
|
||||||
out.writeLong(i);
|
out.writeLong(i);
|
||||||
}
|
}
|
||||||
out.flush();
|
out.flush();
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
assertTrue(in.readLong() == i);
|
assertTrue(in.readLong() == i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test for AMQ-2988
|
||||||
|
public void testStreamsWithProperties() throws Exception {
|
||||||
|
String name1 = "PROPERTY_1";
|
||||||
|
String name2 = "PROPERTY_2";
|
||||||
|
String value1 = "VALUE_1";
|
||||||
|
String value2 = "VALUE_2";
|
||||||
|
Map<String,Object> jmsProperties = new HashMap<String, Object>();
|
||||||
|
jmsProperties.put(name1, value1);
|
||||||
|
jmsProperties.put(name2, value2);
|
||||||
|
setUpConnection(jmsProperties);
|
||||||
|
|
||||||
|
out.writeInt(4);
|
||||||
|
out.flush();
|
||||||
|
assertTrue(in.readInt() == 4);
|
||||||
|
out.writeFloat(2.3f);
|
||||||
|
out.flush();
|
||||||
|
assertTrue(in.readFloat() == 2.3f);
|
||||||
|
String str = "this is a test string";
|
||||||
|
out.writeUTF(str);
|
||||||
|
out.flush();
|
||||||
|
assertTrue(in.readUTF().equals(str));
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
out.writeLong(i);
|
||||||
|
}
|
||||||
|
out.flush();
|
||||||
|
|
||||||
|
// check properties before we try to read the stream
|
||||||
|
checkProperties(jmsProperties);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
assertTrue(in.readLong() == i);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check again after read was done
|
||||||
|
checkProperties(jmsProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if the received stream has the properties set
|
||||||
|
// Test for AMQ-2988
|
||||||
|
private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
|
||||||
|
Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
|
||||||
|
|
||||||
|
// we should at least have the same amount or more properties
|
||||||
|
assertTrue(jmsProperties.size() <= receivedJmsProps.size());
|
||||||
|
|
||||||
|
|
||||||
|
// check the properties to see if we have everything in there
|
||||||
|
Iterator<String> propsIt = jmsProperties.keySet().iterator();
|
||||||
|
while(propsIt.hasNext()) {
|
||||||
|
String key = propsIt.next();
|
||||||
|
assertTrue(receivedJmsProps.containsKey(key));
|
||||||
|
assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testLarge() throws Exception {
|
public void testLarge() throws Exception {
|
||||||
|
setUpConnection(null);
|
||||||
|
|
||||||
final int testData = 23;
|
final int testData = 23;
|
||||||
final int dataLength = 4096;
|
final int dataLength = 4096;
|
||||||
final int count = 1024;
|
final int count = 1024;
|
||||||
|
|
Loading…
Reference in New Issue