diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index f3116ca258..e142edcac4 100755 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -16,27 +16,27 @@ */ package org.apache.activemq.transport.http; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; + import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; -import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.HeadMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.DataInputStream; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URI; - /** * A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the commons-httpclient @@ -46,14 +46,16 @@ import java.net.URI; */ public class HttpClientTransport extends HttpTransportSupport { private static final Log log = LogFactory.getLog(HttpClientTransport.class); - public static final int MAX_CLIENT_TIMEOUT = 30000; + private static final IdGenerator clientIdGenerator = new IdGenerator(); + private HttpClient sendHttpClient; private HttpClient receiveHttpClient; - private String clientID; -// private String sessionID; - + + private final String clientID = clientIdGenerator.generateId(); + private boolean trace; + public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); } @@ -63,19 +65,23 @@ public class HttpClientTransport extends HttpTransportSupport { } public void oneway(Command command) throws IOException { - if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) - clientID = ((ConnectionInfo) command).getClientId(); - + + if( isStopped() ) { + throw new IOException("stopped."); + } + PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); configureMethod(httpMethod); httpMethod.setRequestBody(getTextWireFormat().toString(command)); try { + HttpClient client = getSendHttpClient(); client.setTimeout(MAX_CLIENT_TIMEOUT); int answer = client.executeMethod(httpMethod); if (answer != HttpStatus.SC_OK) { throw new IOException("Failed to post command: " + command + " as response was: " + answer); } + // checkSession(httpMethod); } catch (IOException e) { throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); @@ -90,10 +96,12 @@ public class HttpClientTransport extends HttpTransportSupport { } public void run() { + log.trace("HTTP GET consumer thread starting: " + this); HttpClient httpClient = getReceiveHttpClient(); URI remoteUrl = getRemoteUrl(); - while (!isStopped()) { + + while ( !isStopped() && !isStopping() ) { GetMethod httpMethod = new GetMethod(remoteUrl.toString()); configureMethod(httpMethod); @@ -102,25 +110,34 @@ public class HttpClientTransport extends HttpTransportSupport { int answer = httpClient.executeMethod(httpMethod); if (answer != HttpStatus.SC_OK) { if (answer == HttpStatus.SC_REQUEST_TIMEOUT) { - log.info("GET timed out"); + log.debug("GET timed out"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + onException(new InterruptedIOException()); + break; + } } else { - log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); + onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); + break; } } else { // checkSession(httpMethod); - Command command = getTextWireFormat().readCommand(new DataInputStream(httpMethod.getResponseBodyAsStream())); + DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); + + Command command = getTextWireFormat().readCommand(stream); if (command == null) { log.warn("Received null command from url: " + remoteUrl); - } - else { + } else { doConsume(command); } } } catch (IOException e) { - log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e); + onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl+" Reason: "+e.getMessage(),e)); + break; } finally { httpMethod.getResponseBody(); httpMethod.releaseConnection(); @@ -154,8 +171,24 @@ public class HttpClientTransport extends HttpTransportSupport { // Implementation methods // ------------------------------------------------------------------------- + protected void doStart() throws Exception { + + log.trace("HTTP GET consumer thread starting: " + this); + HttpClient httpClient = getReceiveHttpClient(); + URI remoteUrl = getRemoteUrl(); + + HeadMethod httpMethod = new HeadMethod(remoteUrl.toString()); + configureMethod(httpMethod); + + int answer = httpClient.executeMethod(httpMethod); + if (answer != HttpStatus.SC_OK) { + throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); + } + + super.doStart(); + } + protected void doStop(ServiceStopper stopper) throws Exception { - // TODO } protected HttpClient createHttpClient() { @@ -163,15 +196,17 @@ public class HttpClientTransport extends HttpTransportSupport { } protected void configureMethod(HttpMethod method) { -// if (sessionID != null) { -// method.addRequestHeader("Cookie", "JSESSIONID=" + sessionID); -// } -// else - if (clientID != null) { - method.setRequestHeader("clientID", clientID); - } + method.setRequestHeader("clientID", clientID); } + public boolean isTrace() { + return trace; + } + + public void setTrace(boolean trace) { + this.trace = trace; + } + // protected void checkSession(HttpMethod client) { // Header header = client.getRequestHeader("Set-Cookie"); // if (header != null) { diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java index e22e38c683..3de37c53f8 100755 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java @@ -17,12 +17,13 @@ package org.apache.activemq.transport.http; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URI; +import java.util.Map; import org.apache.activeio.command.WireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.transport.xstream.XStreamWireFormat; @@ -51,10 +52,18 @@ public class HttpTransportFactory extends TransportFactory { return "xstream"; } - protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException { - TextWireFormat textWireFormat = asTextWireFormat(wf); - Transport transport = new HttpClientTransport(textWireFormat, location); - return transport; + protected Transport createTransport(URI location, WireFormat wf) throws IOException { + TextWireFormat textWireFormat = asTextWireFormat(wf); + return new HttpClientTransport(textWireFormat, location); + } + + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport, format, options); + transport = httpTransport; + if( httpTransport.isTrace() ) { + transport = new TransportLogger(httpTransport); + } + return transport; } } diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java index f8f86455ca..6d9c04c347 100755 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java @@ -26,10 +26,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.util.TextWireFormat; @@ -67,30 +65,34 @@ public class HttpTunnelServlet extends HttpServlet { wireFormat = createWireFormat(); } } - + + protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + createTransportChannel(request, response); + } + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // lets return the next response Command packet = null; + int count=0; try { - BlockingQueueTransport transportChannel = getTransportChannel(request); - if (transportChannel == null) { - response.sendError(HttpServletResponse.SC_BAD_REQUEST, "clientID not specified."); - log("No transport available! "); + BlockingQueueTransport transportChannel = getTransportChannel(request, response); + if (transportChannel == null) return; - } + packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); + + DataOutputStream stream = new DataOutputStream(response.getOutputStream()); +// while( packet !=null ) { + wireFormat.marshal(packet, stream); + count++; +// packet = (Command) transportChannel.getQueue().poll(0, TimeUnit.MILLISECONDS); +// } + + } catch (InterruptedException ignore) { } - catch (InterruptedException e) { - // ignore - } - if (packet == null) { - // TODO temporary hack to prevent busy loop. Replace with continuations - try{ Thread.sleep(250);}catch (InterruptedException e) { e.printStackTrace(); } + if (count == 0) { response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); } - else { - wireFormat.marshal(packet, new DataOutputStream(response.getOutputStream())); - } } protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { @@ -104,20 +106,13 @@ public class HttpTunnelServlet extends HttpServlet { response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion()); } - } - else { - if (command instanceof ConnectionInfo) { - ConnectionInfo info = (ConnectionInfo) command; - request.getSession(true).setAttribute("clientID", info.getClientId()); - } + } else { - BlockingQueueTransport transport = getTransportChannel(request); - if (transport == null) { - response.setStatus(HttpServletResponse.SC_NOT_FOUND); - } - else { - transport.doConsume(command); - } + BlockingQueueTransport transport = getTransportChannel(request, response); + if (transport == null) + return; + + transport.doConsume(command); } } @@ -142,39 +137,43 @@ public class HttpTunnelServlet extends HttpServlet { return buffer.toString(); } - protected BlockingQueueTransport getTransportChannel(HttpServletRequest request) { - HttpSession session = request.getSession(true); - String clientID = null; - if (session != null) { - clientID = (String) session.getAttribute("clientID"); - } + protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { + String clientID = request.getHeader("clientID"); if (clientID == null) { - clientID = request.getHeader("clientID"); - } - /** - * if (clientID == null) { clientID = request.getParameter("clientID"); } - */ - if (clientID == null) { - log.warn("No clientID header so ignoring request"); + response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); + log.warn("No clientID header specified"); return null; } synchronized (this) { BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID); if (answer == null) { - answer = createTransportChannel(); - clients.put(clientID, answer); - listener.onAccept(answer); + log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID); + return null; } - else { - /* - try { - answer.oneway(ping); - } - catch (IOException e) { - log.warn("Failed to ping transport: " + e, e); - } - */ + return answer; + } + } + + protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { + String clientID = request.getHeader("clientID"); + + if (clientID == null) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); + log.warn("No clientID header specified"); + return null; + } + + synchronized (this) { + BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID); + if (answer != null) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established"); + log.warn("A session for clientID '"+clientID+"' has allready been established"); + return null; } + + answer = createTransportChannel(); + clients.put(clientID, answer); + listener.onAccept(answer); return answer; } }