mirror of https://github.com/apache/activemq.git
and http://issues.apache.org/activemq/browse/AMQ-807 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@420705 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2fe01ce355
commit
ff30070bea
|
@ -16,27 +16,27 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.http;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.transport.FutureResponse;
|
||||
import org.apache.activemq.transport.util.TextWireFormat;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.commons.httpclient.Header;
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.HttpMethod;
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.httpclient.methods.GetMethod;
|
||||
import org.apache.commons.httpclient.methods.HeadMethod;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the <a
|
||||
* href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a>
|
||||
|
@ -46,14 +46,16 @@ import java.net.URI;
|
|||
*/
|
||||
public class HttpClientTransport extends HttpTransportSupport {
|
||||
private static final Log log = LogFactory.getLog(HttpClientTransport.class);
|
||||
|
||||
public static final int MAX_CLIENT_TIMEOUT = 30000;
|
||||
|
||||
private static final IdGenerator clientIdGenerator = new IdGenerator();
|
||||
|
||||
private HttpClient sendHttpClient;
|
||||
private HttpClient receiveHttpClient;
|
||||
private String clientID;
|
||||
// private String sessionID;
|
||||
|
||||
|
||||
private final String clientID = clientIdGenerator.generateId();
|
||||
private boolean trace;
|
||||
|
||||
public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
|
||||
super(wireFormat, remoteUrl);
|
||||
}
|
||||
|
@ -63,19 +65,23 @@ public class HttpClientTransport extends HttpTransportSupport {
|
|||
}
|
||||
|
||||
public void oneway(Command command) throws IOException {
|
||||
if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE)
|
||||
clientID = ((ConnectionInfo) command).getClientId();
|
||||
|
||||
|
||||
if( isStopped() ) {
|
||||
throw new IOException("stopped.");
|
||||
}
|
||||
|
||||
PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
|
||||
configureMethod(httpMethod);
|
||||
httpMethod.setRequestBody(getTextWireFormat().toString(command));
|
||||
try {
|
||||
|
||||
HttpClient client = getSendHttpClient();
|
||||
client.setTimeout(MAX_CLIENT_TIMEOUT);
|
||||
int answer = client.executeMethod(httpMethod);
|
||||
if (answer != HttpStatus.SC_OK) {
|
||||
throw new IOException("Failed to post command: " + command + " as response was: " + answer);
|
||||
}
|
||||
|
||||
// checkSession(httpMethod);
|
||||
} catch (IOException e) {
|
||||
throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
|
||||
|
@ -90,10 +96,12 @@ public class HttpClientTransport extends HttpTransportSupport {
|
|||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
log.trace("HTTP GET consumer thread starting: " + this);
|
||||
HttpClient httpClient = getReceiveHttpClient();
|
||||
URI remoteUrl = getRemoteUrl();
|
||||
while (!isStopped()) {
|
||||
|
||||
while ( !isStopped() && !isStopping() ) {
|
||||
|
||||
GetMethod httpMethod = new GetMethod(remoteUrl.toString());
|
||||
configureMethod(httpMethod);
|
||||
|
@ -102,25 +110,34 @@ public class HttpClientTransport extends HttpTransportSupport {
|
|||
int answer = httpClient.executeMethod(httpMethod);
|
||||
if (answer != HttpStatus.SC_OK) {
|
||||
if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
|
||||
log.info("GET timed out");
|
||||
log.debug("GET timed out");
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
onException(new InterruptedIOException());
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
|
||||
onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// checkSession(httpMethod);
|
||||
Command command = getTextWireFormat().readCommand(new DataInputStream(httpMethod.getResponseBodyAsStream()));
|
||||
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
|
||||
|
||||
Command command = getTextWireFormat().readCommand(stream);
|
||||
if (command == null) {
|
||||
log.warn("Received null command from url: " + remoteUrl);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
doConsume(command);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
|
||||
onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl+" Reason: "+e.getMessage(),e));
|
||||
break;
|
||||
} finally {
|
||||
httpMethod.getResponseBody();
|
||||
httpMethod.releaseConnection();
|
||||
|
@ -154,8 +171,24 @@ public class HttpClientTransport extends HttpTransportSupport {
|
|||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
protected void doStart() throws Exception {
|
||||
|
||||
log.trace("HTTP GET consumer thread starting: " + this);
|
||||
HttpClient httpClient = getReceiveHttpClient();
|
||||
URI remoteUrl = getRemoteUrl();
|
||||
|
||||
HeadMethod httpMethod = new HeadMethod(remoteUrl.toString());
|
||||
configureMethod(httpMethod);
|
||||
|
||||
int answer = httpClient.executeMethod(httpMethod);
|
||||
if (answer != HttpStatus.SC_OK) {
|
||||
throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
|
||||
}
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
// TODO
|
||||
}
|
||||
|
||||
protected HttpClient createHttpClient() {
|
||||
|
@ -163,15 +196,17 @@ public class HttpClientTransport extends HttpTransportSupport {
|
|||
}
|
||||
|
||||
protected void configureMethod(HttpMethod method) {
|
||||
// if (sessionID != null) {
|
||||
// method.addRequestHeader("Cookie", "JSESSIONID=" + sessionID);
|
||||
// }
|
||||
// else
|
||||
if (clientID != null) {
|
||||
method.setRequestHeader("clientID", clientID);
|
||||
}
|
||||
method.setRequestHeader("clientID", clientID);
|
||||
}
|
||||
|
||||
public boolean isTrace() {
|
||||
return trace;
|
||||
}
|
||||
|
||||
public void setTrace(boolean trace) {
|
||||
this.trace = trace;
|
||||
}
|
||||
|
||||
// protected void checkSession(HttpMethod client) {
|
||||
// Header header = client.getRequestHeader("Set-Cookie");
|
||||
// if (header != null) {
|
||||
|
|
|
@ -17,12 +17,13 @@
|
|||
package org.apache.activemq.transport.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activeio.command.WireFormat;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportLogger;
|
||||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.util.TextWireFormat;
|
||||
import org.apache.activemq.transport.xstream.XStreamWireFormat;
|
||||
|
@ -51,10 +52,18 @@ public class HttpTransportFactory extends TransportFactory {
|
|||
return "xstream";
|
||||
}
|
||||
|
||||
protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException {
|
||||
TextWireFormat textWireFormat = asTextWireFormat(wf);
|
||||
Transport transport = new HttpClientTransport(textWireFormat, location);
|
||||
return transport;
|
||||
protected Transport createTransport(URI location, WireFormat wf) throws IOException {
|
||||
TextWireFormat textWireFormat = asTextWireFormat(wf);
|
||||
return new HttpClientTransport(textWireFormat, location);
|
||||
}
|
||||
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport, format, options);
|
||||
transport = httpTransport;
|
||||
if( httpTransport.isTrace() ) {
|
||||
transport = new TransportLogger(httpTransport);
|
||||
}
|
||||
return transport;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,10 +26,8 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.servlet.http.HttpSession;
|
||||
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.transport.TransportAcceptListener;
|
||||
import org.apache.activemq.transport.util.TextWireFormat;
|
||||
|
@ -67,30 +65,34 @@ public class HttpTunnelServlet extends HttpServlet {
|
|||
wireFormat = createWireFormat();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||
createTransportChannel(request, response);
|
||||
}
|
||||
|
||||
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||
// lets return the next response
|
||||
Command packet = null;
|
||||
int count=0;
|
||||
try {
|
||||
BlockingQueueTransport transportChannel = getTransportChannel(request);
|
||||
if (transportChannel == null) {
|
||||
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "clientID not specified.");
|
||||
log("No transport available! ");
|
||||
BlockingQueueTransport transportChannel = getTransportChannel(request, response);
|
||||
if (transportChannel == null)
|
||||
return;
|
||||
}
|
||||
|
||||
packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
DataOutputStream stream = new DataOutputStream(response.getOutputStream());
|
||||
// while( packet !=null ) {
|
||||
wireFormat.marshal(packet, stream);
|
||||
count++;
|
||||
// packet = (Command) transportChannel.getQueue().poll(0, TimeUnit.MILLISECONDS);
|
||||
// }
|
||||
|
||||
} catch (InterruptedException ignore) {
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
if (packet == null) {
|
||||
// TODO temporary hack to prevent busy loop. Replace with continuations
|
||||
try{ Thread.sleep(250);}catch (InterruptedException e) { e.printStackTrace(); }
|
||||
if (count == 0) {
|
||||
response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
|
||||
}
|
||||
else {
|
||||
wireFormat.marshal(packet, new DataOutputStream(response.getOutputStream()));
|
||||
}
|
||||
}
|
||||
|
||||
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||
|
@ -104,20 +106,13 @@ public class HttpTunnelServlet extends HttpServlet {
|
|||
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
|
||||
}
|
||||
|
||||
}
|
||||
else {
|
||||
if (command instanceof ConnectionInfo) {
|
||||
ConnectionInfo info = (ConnectionInfo) command;
|
||||
request.getSession(true).setAttribute("clientID", info.getClientId());
|
||||
}
|
||||
} else {
|
||||
|
||||
BlockingQueueTransport transport = getTransportChannel(request);
|
||||
if (transport == null) {
|
||||
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
|
||||
}
|
||||
else {
|
||||
transport.doConsume(command);
|
||||
}
|
||||
BlockingQueueTransport transport = getTransportChannel(request, response);
|
||||
if (transport == null)
|
||||
return;
|
||||
|
||||
transport.doConsume(command);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,39 +137,43 @@ public class HttpTunnelServlet extends HttpServlet {
|
|||
return buffer.toString();
|
||||
}
|
||||
|
||||
protected BlockingQueueTransport getTransportChannel(HttpServletRequest request) {
|
||||
HttpSession session = request.getSession(true);
|
||||
String clientID = null;
|
||||
if (session != null) {
|
||||
clientID = (String) session.getAttribute("clientID");
|
||||
}
|
||||
protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
|
||||
String clientID = request.getHeader("clientID");
|
||||
if (clientID == null) {
|
||||
clientID = request.getHeader("clientID");
|
||||
}
|
||||
/**
|
||||
* if (clientID == null) { clientID = request.getParameter("clientID"); }
|
||||
*/
|
||||
if (clientID == null) {
|
||||
log.warn("No clientID header so ignoring request");
|
||||
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
|
||||
log.warn("No clientID header specified");
|
||||
return null;
|
||||
}
|
||||
synchronized (this) {
|
||||
BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
|
||||
if (answer == null) {
|
||||
answer = createTransportChannel();
|
||||
clients.put(clientID, answer);
|
||||
listener.onAccept(answer);
|
||||
log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID);
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
/*
|
||||
try {
|
||||
answer.oneway(ping);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn("Failed to ping transport: " + e, e);
|
||||
}
|
||||
*/
|
||||
return answer;
|
||||
}
|
||||
}
|
||||
|
||||
protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
|
||||
String clientID = request.getHeader("clientID");
|
||||
|
||||
if (clientID == null) {
|
||||
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
|
||||
log.warn("No clientID header specified");
|
||||
return null;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
|
||||
if (answer != null) {
|
||||
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established");
|
||||
log.warn("A session for clientID '"+clientID+"' has allready been established");
|
||||
return null;
|
||||
}
|
||||
|
||||
answer = createTransportChannel();
|
||||
clients.put(clientID, answer);
|
||||
listener.onAccept(answer);
|
||||
return answer;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue