mirror of https://github.com/apache/activemq.git
support destination options on a stream consumer.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383971 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ff3487b2f
commit
c977f7ddfc
|
@ -18,6 +18,7 @@ package org.apache.activemq;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
|
||||
import javax.jms.IllegalStateException;
|
||||
import javax.jms.InvalidDestinationException;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.activemq.command.MessageDispatch;
|
|||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
|
||||
/**
|
||||
|
@ -81,7 +83,6 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
|
|||
}
|
||||
|
||||
this.info = new ConsumerInfo(consumerId);
|
||||
this.info.setDestination(dest);
|
||||
this.info.setSubcriptionName(name);
|
||||
|
||||
if (selector != null && selector.trim().length() != 0) {
|
||||
|
@ -92,11 +93,19 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
|
|||
|
||||
new SelectorParser().parse(selector);
|
||||
this.info.setSelector(selector);
|
||||
|
||||
|
||||
this.info.setPrefetchSize(prefetch);
|
||||
this.info.setNoLocal(noLocal);
|
||||
this.info.setBrowser(false);
|
||||
this.info.setDispatchAsync(false);
|
||||
|
||||
// Allows the options on the destination to configure the consumerInfo
|
||||
if (dest.getOptions() != null) {
|
||||
HashMap options = new HashMap(dest.getOptions());
|
||||
IntrospectionSupport.setProperties(this.info, options, "consumer.");
|
||||
}
|
||||
|
||||
this.info.setDestination(dest);
|
||||
|
||||
this.connection.addInputStream(this);
|
||||
this.connection.addDispatcher(info.getConsumerId(), this);
|
||||
|
|
Loading…
Reference in New Issue