mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-1547 - support for configurable selector header name
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@941673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ce9f83ccfc
commit
a05e84eea3
|
@ -35,7 +35,7 @@ public class RestTest extends JettyTestSupport {
|
||||||
ContentExchange contentExchange = new ContentExchange();
|
ContentExchange contentExchange = new ContentExchange();
|
||||||
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||||
contentExchange.setURL("http://localhost:8080/message/test?timeout=1000&type=queue");
|
contentExchange.setURL("http://localhost:8080/message/test?timeout=1000&type=queue");
|
||||||
contentExchange.setRequestHeader(WebClient.SELECTOR_NAME, "test=2");
|
contentExchange.setRequestHeader("selector", "test=2");
|
||||||
httpClient.send(contentExchange);
|
httpClient.send(contentExchange);
|
||||||
contentExchange.waitForDone();
|
contentExchange.waitForDone();
|
||||||
assertEquals("test2", contentExchange.getResponseContent());
|
assertEquals("test2", contentExchange.getResponseContent());
|
||||||
|
|
|
@ -156,7 +156,7 @@ public class MessageListenerServlet extends MessageServletSupport {
|
||||||
Map<MessageAvailableConsumer, String> consumerDestinationMap = getConsumerDestinationNameMap(request);
|
Map<MessageAvailableConsumer, String> consumerDestinationMap = getConsumerDestinationNameMap(request);
|
||||||
client.closeConsumer(destination); // drop any existing
|
client.closeConsumer(destination); // drop any existing
|
||||||
// consumer.
|
// consumer.
|
||||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.SELECTOR_NAME));
|
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||||
|
|
||||||
consumer.setAvailableListener(listener);
|
consumer.setAvailableListener(listener);
|
||||||
consumerIdMap.put(consumer, message);
|
consumerIdMap.put(consumer, message);
|
||||||
|
@ -167,7 +167,7 @@ public class MessageListenerServlet extends MessageServletSupport {
|
||||||
} else if ("unlisten".equals(type)) {
|
} else if ("unlisten".equals(type)) {
|
||||||
Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
|
Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
|
||||||
Map consumerDestinationMap = getConsumerDestinationNameMap(request);
|
Map consumerDestinationMap = getConsumerDestinationNameMap(request);
|
||||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.SELECTOR_NAME));
|
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||||
|
|
||||||
consumer.setAvailableListener(null);
|
consumer.setAvailableListener(null);
|
||||||
consumerIdMap.remove(consumer);
|
consumerIdMap.remove(consumer);
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.SELECTOR_NAME));
|
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||||
Continuation continuation = null;
|
Continuation continuation = null;
|
||||||
Listener listener = null;
|
Listener listener = null;
|
||||||
Message message = null;
|
Message message = null;
|
||||||
|
@ -297,7 +297,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.SELECTOR_NAME));
|
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
|
||||||
Message message = null;
|
Message message = null;
|
||||||
|
|
||||||
// write a responds
|
// write a responds
|
||||||
|
|
|
@ -355,6 +355,6 @@ public abstract class MessageServletSupport extends HttpServlet {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getSelector(HttpServletRequest request) throws IOException {
|
protected String getSelector(HttpServletRequest request) throws IOException {
|
||||||
return request.getHeader(WebClient.SELECTOR_NAME);
|
return request.getHeader(WebClient.selectorName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
|
||||||
public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
|
public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
|
||||||
public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
|
public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
|
||||||
public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
|
public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
|
||||||
public static final String SELECTOR_NAME = "org.apache.activemq.selector";
|
public static final String SELECTOR_NAME = "org.apache.activemq.selectorName";
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(WebClient.class);
|
private static final Log LOG = LogFactory.getLog(WebClient.class);
|
||||||
|
|
||||||
|
@ -81,6 +81,7 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
|
||||||
private transient Session session;
|
private transient Session session;
|
||||||
private transient MessageProducer producer;
|
private transient MessageProducer producer;
|
||||||
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
|
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
|
||||||
|
public static String selectorName;
|
||||||
|
|
||||||
private final Semaphore semaphore = new Semaphore(1);
|
private final Semaphore semaphore = new Semaphore(1);
|
||||||
|
|
||||||
|
@ -122,6 +123,12 @@ public class WebClient implements HttpSessionActivationListener, HttpSessionBind
|
||||||
public static void initContext(ServletContext context) {
|
public static void initContext(ServletContext context) {
|
||||||
initConnectionFactory(context);
|
initConnectionFactory(context);
|
||||||
context.setAttribute("webClients", new HashMap<String, WebClient>());
|
context.setAttribute("webClients", new HashMap<String, WebClient>());
|
||||||
|
if (selectorName == null) {
|
||||||
|
selectorName = context.getInitParameter(SELECTOR_NAME);
|
||||||
|
}
|
||||||
|
if (selectorName == null) {
|
||||||
|
selectorName = "selector";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getDeliveryMode() {
|
public int getDeliveryMode() {
|
||||||
|
|
Loading…
Reference in New Issue