mirror of https://github.com/apache/activemq.git
AMQ-4277: Added unit test.
This commit is contained in:
parent
d07a5a9dc6
commit
987769a62b
|
@ -19,23 +19,21 @@ package org.apache.activemq.web;
|
|||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.nio.SelectChannelConnector;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JettyTestSupport extends TestCase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JettyTestSupport.class);
|
||||
|
@ -50,10 +48,15 @@ public class JettyTestSupport extends TestCase {
|
|||
URI tcpUri;
|
||||
URI stompUri;
|
||||
|
||||
protected boolean isPersistent() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setBrokerName("amq-broker");
|
||||
broker.setPersistent(false);
|
||||
broker.setPersistent(isPersistent());
|
||||
broker.setDataDirectory("target/activemq-data");
|
||||
broker.setUseJmx(true);
|
||||
tcpUri = new URI(broker.addConnector("tcp://localhost:0").getPublishableConnectString());
|
||||
stompUri = new URI(broker.addConnector("stomp://localhost:0").getPublishableConnectString());
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.web;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
||||
import org.eclipse.jetty.client.ContentExchange;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.junit.Ignore;
|
||||
|
||||
public class RestPersistentTest extends JettyTestSupport {
|
||||
|
||||
@Override
|
||||
protected boolean isPersistent() {
|
||||
// need persistent for post/get
|
||||
return true;
|
||||
}
|
||||
|
||||
public void testPostAndGetWithQueue() throws Exception {
|
||||
postAndGet("queue");
|
||||
}
|
||||
|
||||
public void testPostAndGetWithTopic() throws Exception {
|
||||
// TODO: problems with topics
|
||||
// postAndGet("topic");
|
||||
}
|
||||
|
||||
public void postAndGet(String destinationType) throws Exception {
|
||||
|
||||
final String urlGET="http://localhost:8080/message/upcTest?clientId=consumer1&readTimeout=5000&type="+destinationType;
|
||||
final String urlPOST="http://localhost:8080/message/upcTest?type="+destinationType;
|
||||
|
||||
final String message1="<itemPolicy><upc>1001</upc></itemPolicy>";
|
||||
final String property1="terminalNumber=lane1";
|
||||
final String selector1="terminalNumber='lane1'";
|
||||
|
||||
HttpClient httpClient = new HttpClient();
|
||||
httpClient.start();
|
||||
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
|
||||
|
||||
//post first message
|
||||
// TODO: a problem with GET before POST
|
||||
// getMessage(httpClient, urlGET, selector1, null); //should NOT receive message1
|
||||
postMessage(httpClient, urlPOST, property1, message1);
|
||||
getMessage(httpClient, urlGET, selector1, message1); //should receive message1
|
||||
}
|
||||
|
||||
private void postMessage(HttpClient httpClient, String url, String properties, String message) throws Exception
|
||||
{
|
||||
ContentExchange contentExchange = new ContentExchange();
|
||||
contentExchange.setMethod("POST");
|
||||
contentExchange.setURL(url+"&"+properties);
|
||||
//contentExchange.setRequestHeader("accept", "text/xml");
|
||||
contentExchange.setRequestHeader("Content-Type","text/xml");
|
||||
contentExchange.setRequestContentSource(new ByteArrayInputStream(message.getBytes("UTF-8")));
|
||||
|
||||
httpClient.send(contentExchange);
|
||||
contentExchange.waitForDone();
|
||||
assertTrue("success status", HttpStatus.isSuccess(contentExchange.getResponseStatus()));
|
||||
}
|
||||
|
||||
private void getMessage(HttpClient httpClient, String url, String selector, String expectedMessage) throws Exception
|
||||
{
|
||||
ContentExchange contentExchange = new ContentExchange(true);
|
||||
contentExchange.setURL(url);
|
||||
contentExchange.setRequestHeader("accept", "text/xml");
|
||||
contentExchange.setRequestHeader("Content-Type","text/xml");
|
||||
if(selector!=null)
|
||||
{
|
||||
contentExchange.setRequestHeader("selector", selector);
|
||||
}
|
||||
httpClient.send(contentExchange);
|
||||
contentExchange.waitForDone();
|
||||
assertTrue("success status", HttpStatus.isSuccess(contentExchange.getResponseStatus()));
|
||||
|
||||
if(expectedMessage!=null)
|
||||
{
|
||||
assertNotNull(contentExchange.getResponseContent());
|
||||
assertEquals(expectedMessage, contentExchange.getResponseContent().trim());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,8 +35,6 @@ import javax.servlet.http.HttpServletResponse;
|
|||
|
||||
import org.apache.activemq.MessageAvailableConsumer;
|
||||
import org.apache.activemq.MessageAvailableListener;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -196,7 +194,7 @@ public class MessageServlet extends MessageServletSupport {
|
|||
Continuation continuation = null;
|
||||
Listener listener = null;
|
||||
|
||||
// Look for any available messages
|
||||
// Look for any available messages (need a little timeout)
|
||||
message = consumer.receive(10);
|
||||
|
||||
// Get an existing Continuation or create a new one if there are
|
||||
|
@ -392,9 +390,10 @@ public class MessageServlet extends MessageServletSupport {
|
|||
Message message = consumer.receiveNoWait();
|
||||
continuation.setAttribute("message", message);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error receiving message " + e, e);
|
||||
LOG.warn("Error receiving message due " + e.getMessage() + ". This exception is ignored.", e);
|
||||
} finally {
|
||||
continuation.resume();
|
||||
}
|
||||
continuation.resume();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue