git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1061304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-01-20 14:03:30 +00:00
parent 53ea4caa20
commit 9c78797f5d
4 changed files with 97 additions and 30 deletions

View File

@ -2040,9 +2040,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null);
return createInputStream(dest, messageSelector, noLocal, -1);
}
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false);
}
@ -2052,13 +2058,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name);
return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
}
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
}
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
}
/**
@ -2367,4 +2377,5 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
}

View File

@ -57,8 +57,10 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
private ProducerId producerId;
private long nextSequenceId;
private long timeout;
private boolean firstReceived;
public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch)
public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
throws JMSException {
this.connection = connection;
@ -82,6 +84,9 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
}
if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
this.timeout = timeout;
this.info = new ConsumerInfo(consumerId);
this.info.setSubscriptionName(name);
@ -150,11 +155,17 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return jmsProperties;
}
public ActiveMQMessage receive() throws JMSException {
public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
checkClosed();
MessageDispatch md;
try {
md = unconsumedMessages.dequeue(-1);
if (firstReceived || timeout == -1) {
md = unconsumedMessages.dequeue(-1);
firstReceived = true;
} else {
md = unconsumedMessages.dequeue(timeout);
if (md == null) throw new ReadTimeoutException();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e);
@ -186,6 +197,11 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
}
/**
*
* @see InputStream#read()
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@Override
public int read() throws IOException {
fillBuffer();
@ -195,7 +211,12 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return buffer[pos++] & 0xff;
}
/**
*
* @see InputStream#read(byte[], int, int)
* @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
fillBuffer();
@ -273,4 +294,14 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
}
/**
* Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
*
*/
public class ReadTimeoutException extends IOException {
public ReadTimeoutException() {
super();
}
}
}

View File

@ -42,11 +42,15 @@ public interface StreamConnection extends Connection {
InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException;
InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException;
InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException;
OutputStream createOutputStream(Destination dest) throws JMSException;

View File

@ -75,7 +75,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
* @param props
* @throws JMSException
*/
private void setUpConnection(Map<String, Object> props) throws JMSException {
private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection2);
OutputStream amqOut;
@ -85,7 +85,11 @@ public class JMSInputStreamTest extends JmsTestSupport {
amqOut = connection.createOutputStream(destination);
}
out = new DataOutputStream(amqOut);
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
if (timeout == -1) {
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
} else {
amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
}
in = new DataInputStream(amqIn);
}
/*
@ -95,26 +99,21 @@ public class JMSInputStreamTest extends JmsTestSupport {
super.tearDown();
}
public void testStreams() throws Exception {
setUpConnection(null);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
/**
* Test for AMQ-3010
*/
public void testInputStreamTimeout() throws Exception {
long timeout = 500;
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
setUpConnection(null, timeout);
try {
in.read();
fail();
} catch (ActiveMQInputStream.ReadTimeoutException e) {
// timeout reached, everything ok
}
in.close();
}
// Test for AMQ-2988
@ -126,7 +125,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
Map<String,Object> jmsProperties = new HashMap<String, Object>();
jmsProperties.put(name1, value1);
jmsProperties.put(name2, value2);
setUpConnection(jmsProperties);
setUpConnection(jmsProperties, -1);
out.writeInt(4);
out.flush();
@ -173,7 +172,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
public void testLarge() throws Exception {
setUpConnection(null);
setUpConnection(null, -1);
final int testData = 23;
final int dataLength = 4096;
@ -214,4 +213,26 @@ public class JMSInputStreamTest extends JmsTestSupport {
}
assertTrue(complete.get());
}
public void testStreams() throws Exception {
setUpConnection(null, -1);
out.writeInt(4);
out.flush();
assertTrue(in.readInt() == 4);
out.writeFloat(2.3f);
out.flush();
assertTrue(in.readFloat() == 2.3f);
String str = "this is a test string";
out.writeUTF(str);
out.flush();
assertTrue(in.readUTF().equals(str));
for (int i = 0; i < 100; i++) {
out.writeLong(i);
}
out.flush();
for (int i = 0; i < 100; i++) {
assertTrue(in.readLong() == i);
}
}
}