diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index 7404ce3ed1b..563c92dc904 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -129,6 +129,15 @@ public abstract class MultiplexHttpDestination extends Htt return true; } + @Override + public void close() + { + super.close(); + C connection = this.connection; + if (connection != null) + connection.close(); + } + @Override public void close(Connection connection) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 219a56ee047..0b1b1b0fcf2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.client.http; +import java.nio.channels.AsynchronousCloseException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -85,13 +86,8 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec protected boolean onReadTimeout() { LOG.debug("{} idle timeout", this); - - HttpExchange exchange = channel.getHttpExchange(); - if (exchange != null) - return exchange.getRequest().abort(new TimeoutException()); - - getHttpDestination().close(this); - return true; + close(new TimeoutException()); + return false; } @Override @@ -119,14 +115,23 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec @Override public void close() + { + close(new AsynchronousCloseException()); + } + + protected void close(Throwable failure) { if (softClose()) { + // First close then abort, to be sure that the connection cannot be reused + // from an onFailure() handler or by blocking code waiting for completion. getHttpDestination().close(this); getEndPoint().shutdownOutput(); LOG.debug("{} oshut", this); getEndPoint().close(); LOG.debug("{} closed", this); + + abort(failure); } } @@ -135,6 +140,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec return closed.compareAndSet(false, true); } + private boolean abort(Throwable failure) + { + HttpExchange exchange = channel.getHttpExchange(); + return exchange != null && exchange.getRequest().abort(failure); + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index f8894f8ab85..76efb9e3bdc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -127,8 +127,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res // Shutting down the parser may invoke messageComplete() or earlyEOF() parser.atEOF(); parser.parseNext(BufferUtil.EMPTY_BUFFER); - if (!responseFailure(new EOFException())) - getHttpConnection().close(); + getHttpConnection().close(new EOFException()); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 1b7e3ee42ca..5d19a1eccf2 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -1161,4 +1161,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getHeaders().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString())); } + + @Test + public void testLongPollIsAbortedWhenClientIsStopped() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + request.startAsync(); + latch.countDown(); + } + }); + + final CountDownLatch completeLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + if (result.isFailed()) + completeLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + // Stop the client, the complete listener must be invoked. + client.stop(); + + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java deleted file mode 100644 index 86c0b64185e..00000000000 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java +++ /dev/null @@ -1,220 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -public class HttpReceiverTest -{ -// @Rule -// public final TestTracker tracker = new TestTracker(); -// -// private HttpClient client; -// private HttpDestination destination; -// private ByteArrayEndPoint endPoint; -// private HttpConnection connection; -// private HttpConversation conversation; -// -// @Before -// public void init() throws Exception -// { -// client = new HttpClient(); -// client.start(); -// destination = new HttpDestination(client, "http", "localhost", 8080); -// endPoint = new ByteArrayEndPoint(); -// connection = new HttpConnection(client, endPoint, destination); -// conversation = new HttpConversation(client, 1); -// } -// -// @After -// public void destroy() throws Exception -// { -// client.stop(); -// } -// -// protected HttpExchange newExchange() -// { -// HttpRequest request = new HttpRequest(client, URI.create("http://localhost")); -// FutureResponseListener listener = new FutureResponseListener(request); -// HttpExchange exchange = new HttpExchange(conversation, destination, request, Collections.singletonList(listener)); -// conversation.getExchanges().offer(exchange); -// connection.associate(exchange); -// exchange.requestComplete(); -// exchange.terminateRequest(); -// return exchange; -// } -// -// @Test -// public void test_Receive_NoResponseContent() throws Exception -// { -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-length: 0\r\n" + -// "\r\n"); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// -// Response response = listener.get(5, TimeUnit.SECONDS); -// Assert.assertNotNull(response); -// Assert.assertEquals(200, response.getStatus()); -// Assert.assertEquals("OK", response.getReason()); -// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion()); -// HttpFields headers = response.getHeaders(); -// Assert.assertNotNull(headers); -// Assert.assertEquals(1, headers.size()); -// Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH)); -// } -// -// @Test -// public void test_Receive_ResponseContent() throws Exception -// { -// String content = "0123456789ABCDEF"; -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-length: " + content.length() + "\r\n" + -// "\r\n" + -// content); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// -// Response response = listener.get(5, TimeUnit.SECONDS); -// Assert.assertNotNull(response); -// Assert.assertEquals(200, response.getStatus()); -// Assert.assertEquals("OK", response.getReason()); -// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion()); -// HttpFields headers = response.getHeaders(); -// Assert.assertNotNull(headers); -// Assert.assertEquals(1, headers.size()); -// Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH)); -// String received = listener.getContentAsString(StandardCharsets.UTF_8); -// Assert.assertEquals(content, received); -// } -// -// @Test -// public void test_Receive_ResponseContent_EarlyEOF() throws Exception -// { -// String content1 = "0123456789"; -// String content2 = "ABCDEF"; -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-length: " + (content1.length() + content2.length()) + "\r\n" + -// "\r\n" + -// content1); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// endPoint.setInputEOF(); -// connection.receive(); -// -// try -// { -// listener.get(5, TimeUnit.SECONDS); -// Assert.fail(); -// } -// catch (ExecutionException e) -// { -// Assert.assertTrue(e.getCause() instanceof EOFException); -// } -// } -// -// @Test -// public void test_Receive_ResponseContent_IdleTimeout() throws Exception -// { -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-length: 1\r\n" + -// "\r\n"); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// // Simulate an idle timeout -// connection.idleTimeout(); -// -// try -// { -// listener.get(5, TimeUnit.SECONDS); -// Assert.fail(); -// } -// catch (ExecutionException e) -// { -// Assert.assertTrue(e.getCause() instanceof TimeoutException); -// } -// } -// -// @Test -// public void test_Receive_BadResponse() throws Exception -// { -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-length: A\r\n" + -// "\r\n"); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// -// try -// { -// listener.get(5, TimeUnit.SECONDS); -// Assert.fail(); -// } -// catch (ExecutionException e) -// { -// Assert.assertTrue(e.getCause() instanceof HttpResponseException); -// } -// } -// -// @Test -// public void test_Receive_GZIPResponseContent_Fragmented() throws Exception -// { -// byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; -// ByteArrayOutputStream baos = new ByteArrayOutputStream(); -// try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos)) -// { -// gzipOutput.write(data); -// } -// byte[] gzip = baos.toByteArray(); -// -// endPoint.setInput("" + -// "HTTP/1.1 200 OK\r\n" + -// "Content-Length: " + gzip.length + "\r\n" + -// "Content-Encoding: gzip\r\n" + -// "\r\n"); -// HttpExchange exchange = newExchange(); -// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); -// connection.receive(); -// endPoint.reset(); -// -// ByteBuffer buffer = ByteBuffer.wrap(gzip); -// int fragment = buffer.limit() - 1; -// buffer.limit(fragment); -// endPoint.setInput(buffer); -// connection.receive(); -// endPoint.reset(); -// -// buffer.limit(gzip.length); -// buffer.position(fragment); -// endPoint.setInput(buffer); -// connection.receive(); -// -// ContentResponse response = listener.get(5, TimeUnit.SECONDS); -// Assert.assertNotNull(response); -// Assert.assertEquals(200, response.getStatus()); -// Assert.assertArrayEquals(data, response.getContent()); -// } -} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java deleted file mode 100644 index 3883ebfecd9..00000000000 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java +++ /dev/null @@ -1,280 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -public class HttpSenderTest -{ -// @Rule -// public final TestTracker tracker = new TestTracker(); -// -// private HttpClient client; -// -// @Before -// public void init() throws Exception -// { -// client = new HttpClient(); -// client.start(); -// } -// -// @After -// public void destroy() throws Exception -// { -// client.stop(); -// } -// -// @Test -// public void test_Send_NoRequestContent() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// final CountDownLatch headersLatch = new CountDownLatch(1); -// final CountDownLatch successLatch = new CountDownLatch(1); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onHeaders(Request request) -// { -// headersLatch.countDown(); -// } -// -// @Override -// public void onSuccess(Request request) -// { -// successLatch.countDown(); -// } -// }); -// connection.send(request, (Response.CompleteListener)null); -// -// String requestString = endPoint.takeOutputString(); -// Assert.assertTrue(requestString.startsWith("GET ")); -// Assert.assertTrue(requestString.endsWith("\r\n\r\n")); -// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); -// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); -// } -// -// @Slow -// @Test -// public void test_Send_NoRequestContent_IncompleteFlush() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// connection.send(request, (Response.CompleteListener)null); -// -// // This take will free space in the buffer and allow for the write to complete -// StringBuilder builder = new StringBuilder(endPoint.takeOutputString()); -// -// // Wait for the write to complete -// TimeUnit.SECONDS.sleep(1); -// -// String chunk = endPoint.takeOutputString(); -// while (chunk.length() > 0) -// { -// builder.append(chunk); -// chunk = endPoint.takeOutputString(); -// } -// -// String requestString = builder.toString(); -// Assert.assertTrue(requestString.startsWith("GET ")); -// Assert.assertTrue(requestString.endsWith("\r\n\r\n")); -// } -// -// @Test -// public void test_Send_NoRequestContent_Exception() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); -// // Shutdown output to trigger the exception on write -// endPoint.shutdownOutput(); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// final CountDownLatch failureLatch = new CountDownLatch(2); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onFailure(Request request, Throwable x) -// { -// failureLatch.countDown(); -// } -// }); -// connection.send(request, new Response.Listener.Adapter() -// { -// @Override -// public void onComplete(Result result) -// { -// Assert.assertTrue(result.isFailed()); -// failureLatch.countDown(); -// } -// }); -// -// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); -// } -// -// @Test -// public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// final CountDownLatch failureLatch = new CountDownLatch(2); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onFailure(Request request, Throwable x) -// { -// failureLatch.countDown(); -// } -// }); -// connection.send(request, new Response.Listener.Adapter() -// { -// @Override -// public void onComplete(Result result) -// { -// Assert.assertTrue(result.isFailed()); -// failureLatch.countDown(); -// } -// }); -// -// // Shutdown output to trigger the exception on write -// endPoint.shutdownOutput(); -// // This take will free space in the buffer and allow for the write to complete -// // although it will fail because we shut down the output -// endPoint.takeOutputString(); -// -// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); -// } -// -// @Test -// public void test_Send_SmallRequestContent_InOneBuffer() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// String content = "abcdef"; -// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)))); -// final CountDownLatch headersLatch = new CountDownLatch(1); -// final CountDownLatch successLatch = new CountDownLatch(1); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onHeaders(Request request) -// { -// headersLatch.countDown(); -// } -// -// @Override -// public void onSuccess(Request request) -// { -// successLatch.countDown(); -// } -// }); -// connection.send(request, (Response.CompleteListener)null); -// -// String requestString = endPoint.takeOutputString(); -// Assert.assertTrue(requestString.startsWith("GET ")); -// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content)); -// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); -// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); -// } -// -// @Test -// public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// String content1 = "0123456789"; -// String content2 = "abcdef"; -// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8)))); -// final CountDownLatch headersLatch = new CountDownLatch(1); -// final CountDownLatch successLatch = new CountDownLatch(1); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onHeaders(Request request) -// { -// headersLatch.countDown(); -// } -// -// @Override -// public void onSuccess(Request request) -// { -// successLatch.countDown(); -// } -// }); -// connection.send(request, (Response.CompleteListener)null); -// -// String requestString = endPoint.takeOutputString(); -// Assert.assertTrue(requestString.startsWith("GET ")); -// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2)); -// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); -// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); -// } -// -// @Test -// public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception -// { -// ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); -// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); -// HttpConnection connection = new HttpConnection(client, endPoint, destination); -// Request request = client.newRequest(URI.create("http://localhost/")); -// String content1 = "0123456789"; -// String content2 = "ABCDEF"; -// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8))) -// { -// @Override -// public long getLength() -// { -// return -1; -// } -// }); -// final CountDownLatch headersLatch = new CountDownLatch(1); -// final CountDownLatch successLatch = new CountDownLatch(1); -// request.listener(new Request.Listener.Adapter() -// { -// @Override -// public void onHeaders(Request request) -// { -// headersLatch.countDown(); -// } -// -// @Override -// public void onSuccess(Request request) -// { -// successLatch.countDown(); -// } -// }); -// connection.send(request, (Response.CompleteListener)null); -// -// String requestString = endPoint.takeOutputString(); -// Assert.assertTrue(requestString.startsWith("GET ")); -// String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n"; -// content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n"; -// content += "0\r\n\r\n"; -// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content)); -// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); -// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); -// } -} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java new file mode 100644 index 00000000000..32db8dd5eaf --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -0,0 +1,246 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.http; + +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.HttpResponseException; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.FutureResponseListener; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class HttpReceiverOverHTTPTest +{ + @Rule + public final TestTracker tracker = new TestTracker(); + + private HttpClient client; + private HttpDestinationOverHTTP destination; + private ByteArrayEndPoint endPoint; + private HttpConnectionOverHTTP connection; + + @Before + public void init() throws Exception + { + client = new HttpClient(); + client.start(); + destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + endPoint = new ByteArrayEndPoint(); + connection = new HttpConnectionOverHTTP(endPoint, destination); + } + + @After + public void destroy() throws Exception + { + client.stop(); + } + + protected HttpExchange newExchange() + { + HttpRequest request = (HttpRequest)client.newRequest("http://localhost"); + FutureResponseListener listener = new FutureResponseListener(request); + HttpExchange exchange = new HttpExchange(destination, request, Collections.singletonList(listener)); + connection.getHttpChannel().associate(exchange); + exchange.requestComplete(); + exchange.terminateRequest(null); + return exchange; + } + + @Test + public void test_Receive_NoResponseContent() throws Exception + { + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-length: 0\r\n" + + "\r\n"); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + + Response response = listener.get(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("OK", response.getReason()); + Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion()); + HttpFields headers = response.getHeaders(); + Assert.assertNotNull(headers); + Assert.assertEquals(1, headers.size()); + Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH)); + } + + @Test + public void test_Receive_ResponseContent() throws Exception + { + String content = "0123456789ABCDEF"; + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-length: " + content.length() + "\r\n" + + "\r\n" + + content); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + + Response response = listener.get(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("OK", response.getReason()); + Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion()); + HttpFields headers = response.getHeaders(); + Assert.assertNotNull(headers); + Assert.assertEquals(1, headers.size()); + Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH)); + String received = listener.getContentAsString(StandardCharsets.UTF_8); + Assert.assertEquals(content, received); + } + + @Test + public void test_Receive_ResponseContent_EarlyEOF() throws Exception + { + String content1 = "0123456789"; + String content2 = "ABCDEF"; + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-length: " + (content1.length() + content2.length()) + "\r\n" + + "\r\n" + + content1); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + endPoint.setInputEOF(); + connection.getHttpChannel().receive(); + + try + { + listener.get(5, TimeUnit.SECONDS); + Assert.fail(); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof EOFException); + } + } + + @Test + public void test_Receive_ResponseContent_IdleTimeout() throws Exception + { + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-length: 1\r\n" + + "\r\n"); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + // Simulate an idle timeout + connection.onReadTimeout(); + + try + { + listener.get(5, TimeUnit.SECONDS); + Assert.fail(); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof TimeoutException); + } + } + + @Test + public void test_Receive_BadResponse() throws Exception + { + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-length: A\r\n" + + "\r\n"); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + + try + { + listener.get(5, TimeUnit.SECONDS); + Assert.fail(); + } + catch (ExecutionException e) + { + Assert.assertTrue(e.getCause() instanceof HttpResponseException); + } + } + + @Test + public void test_Receive_GZIPResponseContent_Fragmented() throws Exception + { + byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos)) + { + gzipOutput.write(data); + } + byte[] gzip = baos.toByteArray(); + + endPoint.setInput("" + + "HTTP/1.1 200 OK\r\n" + + "Content-Length: " + gzip.length + "\r\n" + + "Content-Encoding: gzip\r\n" + + "\r\n"); + HttpExchange exchange = newExchange(); + FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); + connection.getHttpChannel().receive(); + endPoint.reset(); + + ByteBuffer buffer = ByteBuffer.wrap(gzip); + int fragment = buffer.limit() - 1; + buffer.limit(fragment); + endPoint.setInput(buffer); + connection.getHttpChannel().receive(); + endPoint.reset(); + + buffer.limit(gzip.length); + buffer.position(fragment); + endPoint.setInput(buffer); + connection.getHttpChannel().receive(); + + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.getStatus()); + Assert.assertArrayEquals(data, response.getContent()); + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java new file mode 100644 index 00000000000..afe1ec7a4c7 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java @@ -0,0 +1,302 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.http; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.ByteBufferContentProvider; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class HttpSenderOverHTTPTest +{ + @Rule + public final TestTracker tracker = new TestTracker(); + + private HttpClient client; + + @Before + public void init() throws Exception + { + client = new HttpClient(); + client.start(); + } + + @After + public void destroy() throws Exception + { + client.stop(); + } + + @Test + public void test_Send_NoRequestContent() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + final CountDownLatch headersLatch = new CountDownLatch(1); + final CountDownLatch successLatch = new CountDownLatch(1); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onHeaders(Request request) + { + headersLatch.countDown(); + } + + @Override + public void onSuccess(Request request) + { + successLatch.countDown(); + } + }); + connection.send(request, null); + + String requestString = endPoint.takeOutputString(); + Assert.assertTrue(requestString.startsWith("GET ")); + Assert.assertTrue(requestString.endsWith("\r\n\r\n")); + Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); + } + + @Slow + @Test + public void test_Send_NoRequestContent_IncompleteFlush() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + connection.send(request, null); + + // This take will free space in the buffer and allow for the write to complete + StringBuilder builder = new StringBuilder(endPoint.takeOutputString()); + + // Wait for the write to complete + TimeUnit.SECONDS.sleep(1); + + String chunk = endPoint.takeOutputString(); + while (chunk.length() > 0) + { + builder.append(chunk); + chunk = endPoint.takeOutputString(); + } + + String requestString = builder.toString(); + Assert.assertTrue(requestString.startsWith("GET ")); + Assert.assertTrue(requestString.endsWith("\r\n\r\n")); + } + + @Test + public void test_Send_NoRequestContent_Exception() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + // Shutdown output to trigger the exception on write + endPoint.shutdownOutput(); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + final CountDownLatch failureLatch = new CountDownLatch(2); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onFailure(Request request, Throwable x) + { + failureLatch.countDown(); + } + }); + connection.send(request, new Response.Listener.Adapter() + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + failureLatch.countDown(); + } + }); + + Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + final CountDownLatch failureLatch = new CountDownLatch(2); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onFailure(Request request, Throwable x) + { + failureLatch.countDown(); + } + }); + connection.send(request, new Response.Listener.Adapter() + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + failureLatch.countDown(); + } + }); + + // Shutdown output to trigger the exception on write + endPoint.shutdownOutput(); + // This take will free space in the buffer and allow for the write to complete + // although it will fail because we shut down the output + endPoint.takeOutputString(); + + Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void test_Send_SmallRequestContent_InOneBuffer() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + String content = "abcdef"; + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)))); + final CountDownLatch headersLatch = new CountDownLatch(1); + final CountDownLatch successLatch = new CountDownLatch(1); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onHeaders(Request request) + { + headersLatch.countDown(); + } + + @Override + public void onSuccess(Request request) + { + successLatch.countDown(); + } + }); + connection.send(request, null); + + String requestString = endPoint.takeOutputString(); + Assert.assertTrue(requestString.startsWith("GET ")); + Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content)); + Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + String content1 = "0123456789"; + String content2 = "abcdef"; + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8)))); + final CountDownLatch headersLatch = new CountDownLatch(1); + final CountDownLatch successLatch = new CountDownLatch(1); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onHeaders(Request request) + { + headersLatch.countDown(); + } + + @Override + public void onSuccess(Request request) + { + successLatch.countDown(); + } + }); + connection.send(request, null); + + String requestString = endPoint.takeOutputString(); + Assert.assertTrue(requestString.startsWith("GET ")); + Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2)); + Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination); + Request request = client.newRequest(URI.create("http://localhost/")); + String content1 = "0123456789"; + String content2 = "ABCDEF"; + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8))) + { + @Override + public long getLength() + { + return -1; + } + }); + final CountDownLatch headersLatch = new CountDownLatch(1); + final CountDownLatch successLatch = new CountDownLatch(1); + request.listener(new Request.Listener.Adapter() + { + @Override + public void onHeaders(Request request) + { + headersLatch.countDown(); + } + + @Override + public void onSuccess(Request request) + { + successLatch.countDown(); + } + }); + connection.send(request, null); + + String requestString = endPoint.takeOutputString(); + Assert.assertTrue(requestString.startsWith("GET ")); + String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n"; + content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n"; + content += "0\r\n\r\n"; + Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content)); + Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index ac5687ef41a..a6f7de6ec3b 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -132,7 +132,7 @@ public class HttpChannelOverFCGI extends HttpChannel if (close) connection.close(); else - connection.release(); + connection.release(this); } protected void flush(Generator.Result... results) @@ -155,7 +155,7 @@ public class HttpChannelOverFCGI extends HttpChannel protected void onIdleExpired(TimeoutException timeout) { LOG.debug("Idle timeout for request {}", request); - abort(timeout); + connection.abort(timeout); } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 599455f9709..262a7c498c9 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.fcgi.client.http; import java.io.EOFException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -139,25 +140,19 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private void shutdown() { - // First close then abort, to be sure that the - // connection cannot be reused from an onFailure() - // handler or by blocking code waiting for completion. - close(); - for (HttpChannelOverFCGI channel : channels.values()) - channel.abort(new EOFException()); + close(new EOFException()); } @Override protected boolean onReadTimeout() { - for (HttpChannelOverFCGI channel : channels.values()) - channel.abort(new TimeoutException()); - close(); + close(new TimeoutException()); return false; } - public void release() + protected void release(HttpChannelOverFCGI channel) { + channels.remove(channel.getRequest()); if (destination instanceof PoolingHttpDestination) { @SuppressWarnings("unchecked") @@ -169,17 +164,37 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec @Override public void close() + { + close(new AsynchronousCloseException()); + } + + private void close(Throwable failure) { if (closed.compareAndSet(false, true)) { + // First close then abort, to be sure that the connection cannot be reused + // from an onFailure() handler or by blocking code waiting for completion. getHttpDestination().close(this); getEndPoint().shutdownOutput(); LOG.debug("{} oshut", this); getEndPoint().close(); LOG.debug("{} closed", this); + + abort(failure); } } + protected void abort(Throwable failure) + { + for (HttpChannelOverFCGI channel : channels.values()) + { + HttpExchange exchange = channel.getHttpExchange(); + if (exchange != null) + exchange.getRequest().abort(failure); + } + channels.clear(); + } + private int acquireRequest() { synchronized (requests) @@ -304,7 +319,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec @Override public void onEnd(int request) { - HttpChannelOverFCGI channel = channels.remove(request); + HttpChannelOverFCGI channel = channels.get(request); if (channel != null) { channel.responseSuccess(); diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java index 3db51fd48c9..103a326eabb 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,6 +38,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -513,4 +515,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(length, response.getContent().length); } + + @Test + public void testLongPollIsAbortedWhenClientIsStopped() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + request.startAsync(); + latch.countDown(); + } + }); + + final CountDownLatch completeLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + if (result.isFailed()) + completeLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + // Stop the client, the complete listener must be invoked. + client.stop(); + + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultBundleClassLoaderHelper.java b/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultBundleClassLoaderHelper.java index 79e350cacc5..199bde3c358 100644 --- a/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultBundleClassLoaderHelper.java +++ b/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultBundleClassLoaderHelper.java @@ -37,37 +37,81 @@ import org.osgi.framework.Bundle; public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper { private static final Logger LOG = Log.getLogger(BundleClassLoaderHelper.class); + private static enum OSGiContainerType {EquinoxOld, EquinoxLuna, FelixOld, Felix403}; + private static OSGiContainerType osgiContainer; + private static Class Equinox_BundleHost_Class; + private static Class Equinox_EquinoxBundle_Class; + private static Class Felix_BundleImpl_Class; + private static Class Felix_BundleWiring_Class; + //old equinox + private static Method Equinox_BundleHost_getBundleLoader_method; + private static Method Equinox_BundleLoader_createClassLoader_method; + //new equinox + private static Method Equinox_EquinoxBundle_getModuleClassLoader_Method; + + //new felix + private static Method Felix_BundleImpl_Adapt_Method; + //old felix + private static Field Felix_BundleImpl_m_Modules_Field; + private static Field Felix_ModuleImpl_m_ClassLoader_Field; + private static Method Felix_BundleWiring_getClassLoader_Method; - private static boolean identifiedOsgiImpl = false; - - private static boolean isEquinox = false; - - private static boolean isFelix = false; - - private static void init(Bundle bundle) + + private static void checkContainerType (Bundle bundle) { - identifiedOsgiImpl = true; + if (osgiContainer != null) + return; + try { - isEquinox = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost") != null; + Equinox_BundleHost_Class = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost"); + osgiContainer = OSGiContainerType.EquinoxOld; + return; } - catch (Throwable t) + catch (ClassNotFoundException e) { - isEquinox = false; + LOG.ignore(e); } - if (!isEquinox) + + try { + Equinox_EquinoxBundle_Class = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.internal.framework.EquinoxBundle"); + osgiContainer = OSGiContainerType.EquinoxLuna; + return; + } + catch (ClassNotFoundException e) + { + LOG.ignore(e); + } + + try + { + //old felix or new felix? + Felix_BundleImpl_Class = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl"); try { - isFelix = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl") != null; + Felix_BundleImpl_Adapt_Method = Felix_BundleImpl_Class.getDeclaredMethod("adapt", new Class[] {Class.class}); + osgiContainer = OSGiContainerType.Felix403; + return; } - catch (Throwable t2) + catch (NoSuchMethodException e) { - isFelix = false; + osgiContainer = OSGiContainerType.FelixOld; + return; } } + catch (ClassNotFoundException e) + { + LOG.warn("Unknown OSGi container type"); + return; + } + } + + + + /** * Assuming the bundle is started. * @@ -77,7 +121,7 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper public ClassLoader getBundleClassLoader(Bundle bundle) { String bundleActivator = (String) bundle.getHeaders().get("Bundle-Activator"); - + if (bundleActivator == null) { bundleActivator = (String) bundle.getHeaders().get("Jetty-ClassInBundle"); @@ -93,80 +137,135 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper LOG.warn(e); } } - // resort to introspection - if (!identifiedOsgiImpl) + + // resort to introspection + return getBundleClassLoaderForContainer(bundle); + } + + /** + * @param bundle + * @return + */ + private ClassLoader getBundleClassLoaderForContainer (Bundle bundle) + { + checkContainerType (bundle); + if (osgiContainer == null) { - init(bundle); - } - if (isEquinox) - { - return internalGetEquinoxBundleClassLoader(bundle); - } - else if (isFelix) - { - return internalGetFelixBundleClassLoader(bundle); + LOG.warn("No classloader for unknown OSGi container type"); + return null; } - LOG.warn("No classloader found for bundle "+bundle.getSymbolicName()); - return null; + switch (osgiContainer) + { + case EquinoxOld: + case EquinoxLuna: + { + return internalGetEquinoxBundleClassLoader(bundle); + } + + case FelixOld: + case Felix403: + { + return internalGetFelixBundleClassLoader(bundle); + } + default: + { + LOG.warn("No classloader found for bundle "+bundle.getSymbolicName()); + return null; + + } + } } + + - private static Method Equinox_BundleHost_getBundleLoader_method; - - private static Method Equinox_BundleLoader_createClassLoader_method; - + /** + * @param bundle + * @return + */ private static ClassLoader internalGetEquinoxBundleClassLoader(Bundle bundle) { - // assume equinox: - try + if (osgiContainer == OSGiContainerType.EquinoxOld) { - if (Equinox_BundleHost_getBundleLoader_method == null) + try { - Equinox_BundleHost_getBundleLoader_method = - bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost").getDeclaredMethod("getBundleLoader", new Class[] {}); - Equinox_BundleHost_getBundleLoader_method.setAccessible(true); + if (Equinox_BundleHost_getBundleLoader_method == null) + { + Equinox_BundleHost_getBundleLoader_method = + Equinox_BundleHost_Class.getDeclaredMethod("getBundleLoader", new Class[] {}); + Equinox_BundleHost_getBundleLoader_method.setAccessible(true); + } + Object bundleLoader = Equinox_BundleHost_getBundleLoader_method.invoke(bundle, new Object[] {}); + if (Equinox_BundleLoader_createClassLoader_method == null && bundleLoader != null) + { + Equinox_BundleLoader_createClassLoader_method = + bundleLoader.getClass().getClassLoader().loadClass("org.eclipse.osgi.internal.loader.BundleLoader").getDeclaredMethod("createClassLoader", new Class[] {}); + Equinox_BundleLoader_createClassLoader_method.setAccessible(true); + } + return (ClassLoader) Equinox_BundleLoader_createClassLoader_method.invoke(bundleLoader, new Object[] {}); } - Object bundleLoader = Equinox_BundleHost_getBundleLoader_method.invoke(bundle, new Object[] {}); - if (Equinox_BundleLoader_createClassLoader_method == null && bundleLoader != null) + catch (ClassNotFoundException t) { - Equinox_BundleLoader_createClassLoader_method = - bundleLoader.getClass().getClassLoader().loadClass("org.eclipse.osgi.internal.loader.BundleLoader").getDeclaredMethod("createClassLoader", new Class[] {}); - Equinox_BundleLoader_createClassLoader_method.setAccessible(true); + LOG.warn(t); + return null; + } + catch (Throwable t) + { + LOG.warn(t); + return null; } - return (ClassLoader) Equinox_BundleLoader_createClassLoader_method.invoke(bundleLoader, new Object[] {}); } - catch (Throwable t) + + if (osgiContainer == OSGiContainerType.EquinoxLuna) { - LOG.warn(t); + try + { + if (Equinox_EquinoxBundle_getModuleClassLoader_Method == null) + Equinox_EquinoxBundle_getModuleClassLoader_Method = Equinox_EquinoxBundle_Class.getDeclaredMethod("getModuleClassLoader", new Class[] {Boolean.TYPE}); + + Equinox_EquinoxBundle_getModuleClassLoader_Method.setAccessible(true); + return (ClassLoader)Equinox_EquinoxBundle_getModuleClassLoader_Method.invoke(bundle, new Object[] {Boolean.FALSE}); + } + catch (Exception e) + { + LOG.warn(e); + return null; + } } + LOG.warn("No classloader for equinox platform for bundle "+bundle.getSymbolicName()); return null; } - private static Field Felix_BundleImpl_m_modules_field; + - private static Field Felix_ModuleImpl_m_classLoader_field; - - private static Method Felix_adapt_method; - - private static Method Felix_bundle_wiring_getClassLoader_method; - - private static Class Felix_bundleWiringClazz; - - private static Boolean isFelix403 = null; + /** + * @param bundle + * @return + */ private static ClassLoader internalGetFelixBundleClassLoader(Bundle bundle) { - //firstly, try to find classes matching a newer version of felix - initFelix403(bundle); - - if (isFelix403.booleanValue()) + + if (osgiContainer == OSGiContainerType.Felix403) { try { - Object wiring = Felix_adapt_method.invoke(bundle, new Object[] {Felix_bundleWiringClazz}); - ClassLoader cl = (ClassLoader)Felix_bundle_wiring_getClassLoader_method.invoke(wiring); - return cl; + if (Felix_BundleWiring_Class == null) + Felix_BundleWiring_Class = bundle.getClass().getClassLoader().loadClass("org.osgi.framework.wiring.BundleWiring"); + + + Felix_BundleImpl_Adapt_Method.setAccessible(true); + + if (Felix_BundleWiring_getClassLoader_Method == null) + { + Felix_BundleWiring_getClassLoader_Method = Felix_BundleWiring_Class.getDeclaredMethod("getClassLoader"); + Felix_BundleWiring_getClassLoader_Method.setAccessible(true); + } + + + Object wiring = Felix_BundleImpl_Adapt_Method.invoke(bundle, new Object[] {Felix_BundleWiring_Class}); + return (ClassLoader)Felix_BundleWiring_getClassLoader_Method.invoke(wiring); } catch (Exception e) { @@ -176,123 +275,92 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper } - // Fallback to trying earlier versions of felix. - if (Felix_BundleImpl_m_modules_field == null) - { + if (osgiContainer == OSGiContainerType.FelixOld) + { try { - Class bundleImplClazz = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl"); - Felix_BundleImpl_m_modules_field = bundleImplClazz.getDeclaredField("m_modules"); - Felix_BundleImpl_m_modules_field.setAccessible(true); - } - catch (ClassNotFoundException e) - { - LOG.warn(e); - } - catch (NoSuchFieldException e) - { - LOG.warn(e); - } - } + if (Felix_BundleImpl_m_Modules_Field == null) + { + Felix_BundleImpl_m_Modules_Field = Felix_BundleImpl_Class.getDeclaredField("m_modules"); + Felix_BundleImpl_m_Modules_Field.setAccessible(true); + } - // Figure out which version of the modules is exported - Object currentModuleImpl; - try - { - Object[] moduleArray = (Object[]) Felix_BundleImpl_m_modules_field.get(bundle); - currentModuleImpl = moduleArray[moduleArray.length - 1]; - } - catch (Throwable t2) - { - try - { - List moduleArray = (List) Felix_BundleImpl_m_modules_field.get(bundle); - currentModuleImpl = moduleArray.get(moduleArray.size() - 1); - } + // Figure out which version of the modules is exported + Object currentModuleImpl; + + try + { + Object[] moduleArray = (Object[]) Felix_BundleImpl_m_Modules_Field.get(bundle); + currentModuleImpl = moduleArray[moduleArray.length - 1]; + } + catch (Throwable t2) + { + try + { + List moduleArray = (List) Felix_BundleImpl_m_Modules_Field.get(bundle); + currentModuleImpl = moduleArray.get(moduleArray.size() - 1); + } + catch (Exception e) + { + LOG.warn(e); + return null; + } + } + + if (Felix_ModuleImpl_m_ClassLoader_Field == null && currentModuleImpl != null) + { + try + { + Felix_ModuleImpl_m_ClassLoader_Field = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.ModuleImpl").getDeclaredField("m_classLoader"); + Felix_ModuleImpl_m_ClassLoader_Field.setAccessible(true); + } + catch (Exception e) + { + LOG.warn(e); + return null; + } + } + + // first make sure that the classloader is ready: + // the m_classLoader field must be initialized by the + // ModuleImpl.getClassLoader() private method. + ClassLoader cl = null; + try + { + cl = (ClassLoader) Felix_ModuleImpl_m_ClassLoader_Field.get(currentModuleImpl); + if (cl != null) + return cl; + } + catch (Exception e) + { + LOG.warn(e); + return null; + } + + // looks like it was not ready: + // the m_classLoader field must be initialized by the + // ModuleImpl.getClassLoader() private method. + // this call will do that. + try + { + bundle.loadClass("java.lang.Object"); + cl = (ClassLoader) Felix_ModuleImpl_m_ClassLoader_Field.get(currentModuleImpl); + return cl; + } + catch (Exception e) + { + LOG.warn(e); + return null; + } + } catch (Exception e) { LOG.warn(e); return null; } } - - if (Felix_ModuleImpl_m_classLoader_field == null && currentModuleImpl != null) - { - try - { - Felix_ModuleImpl_m_classLoader_field = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.ModuleImpl").getDeclaredField("m_classLoader"); - Felix_ModuleImpl_m_classLoader_field.setAccessible(true); - } - catch (ClassNotFoundException e) - { - LOG.warn(e); - return null; - } - catch (NoSuchFieldException e) - { - LOG.warn(e); - return null; - } - } - // first make sure that the classloader is ready: - // the m_classLoader field must be initialized by the - // ModuleImpl.getClassLoader() private method. - ClassLoader cl = null; - try - { - cl = (ClassLoader) Felix_ModuleImpl_m_classLoader_field.get(currentModuleImpl); - if (cl != null) - return cl; - } - catch (Exception e) - { - LOG.warn(e); - return null; - } - // looks like it was not ready: - // the m_classLoader field must be initialized by the - // ModuleImpl.getClassLoader() private method. - // this call will do that. - try - { - bundle.loadClass("java.lang.Object"); - cl = (ClassLoader) Felix_ModuleImpl_m_classLoader_field.get(currentModuleImpl); - return cl; - } - catch (Exception e) - { - LOG.warn(e); - return null; - } - } - - - private static void initFelix403 (Bundle bundle) - { - //see if the version of Felix is a new one - if (isFelix403 == null) - { - try - { - Class bundleImplClazz = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl"); - Felix_bundleWiringClazz = bundle.getClass().getClassLoader().loadClass("org.osgi.framework.wiring.BundleWiring"); - Felix_adapt_method = bundleImplClazz.getDeclaredMethod("adapt", new Class[] {Class.class}); - Felix_adapt_method.setAccessible(true); - Felix_bundle_wiring_getClassLoader_method = Felix_bundleWiringClazz.getDeclaredMethod("getClassLoader"); - Felix_bundle_wiring_getClassLoader_method.setAccessible(true); - isFelix403 = Boolean.TRUE; - } - catch (ClassNotFoundException e) - { - LOG.warn("Felix 4.x classes not found in environment"); - isFelix403 = Boolean.FALSE; - } - catch (NoSuchMethodException e) - { - LOG.warn("Felix 4.x classes not found in environment"); - isFelix403 = Boolean.FALSE; - } - } + LOG.warn("No classloader for felix platform for bundle "+bundle.getSymbolicName()); + return null; } } diff --git a/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultFileLocatorHelper.java b/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultFileLocatorHelper.java index c1d3bcaa732..fed6f8b6cf4 100644 --- a/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultFileLocatorHelper.java +++ b/jetty-osgi/jetty-osgi-boot/src/main/java/org/eclipse/jetty/osgi/boot/utils/internal/DefaultFileLocatorHelper.java @@ -63,7 +63,25 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper // DirZipBundleEntry private static Field ZIP_FILE_FILED_FOR_ZIP_BUNDLE_FILE = null;// ZipFile + + private static final String[] FILE_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.FileBundleEntry","org.eclipse.osgi.storage.bundlefile.FileBundleEntry"}; + private static final String[] ZIP_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.ZipBundleEntry","org.eclipse.osgi.storage.bundlefile.ZipBundleEntry"}; + private static final String[] DIR_ZIP_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.DirZipBundleEntry","org.eclipse.osgi.storage.bundlefile.DirZipBundleEntry"}; + private static final String[] BUNDLE_URL_CONNECTION_CLASSES = {"org.eclipse.osgi.framework.internal.core.BundleURLConnection", "org.eclipse.osgi.storage.url.BundleURLConnection"}; + + public static boolean match (String name, String... names) + { + if (name == null || names == null) + return false; + boolean matched = false; + for (int i=0; i< names.length && !matched; i++) + if (name.equals(names[i])) + matched = true; + return matched; + } + + /** * Works with equinox, felix, nuxeo and probably more. Not exactly in the * spirit of OSGi but quite necessary to support self-contained webapps and @@ -107,7 +125,8 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper BUNDLE_ENTRY_FIELD.setAccessible(true); } Object bundleEntry = BUNDLE_ENTRY_FIELD.get(con); - if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.FileBundleEntry")) + + if (match(bundleEntry.getClass().getName(), FILE_BUNDLE_ENTRY_CLASSES)) { if (FILE_FIELD == null) { @@ -117,7 +136,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper File f = (File) FILE_FIELD.get(bundleEntry); return f.getParentFile().getParentFile(); } - else if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.ZipBundleEntry")) + else if (match(bundleEntry.getClass().getName(), ZIP_BUNDLE_ENTRY_CLASSES)) { url = bundle.getEntry("/"); @@ -144,7 +163,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper ZipFile zipFile = (ZipFile) ZIP_FILE_FILED_FOR_ZIP_BUNDLE_FILE.get(zipBundleFile); return new File(zipFile.getName()); } - else if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.DirZipBundleEntry")) + else if (match (bundleEntry.getClass().getName(), DIR_ZIP_BUNDLE_ENTRY_CLASSES)) { // that will not happen as we did ask for the manifest not a // directory. @@ -309,7 +328,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper URLConnection conn = url.openConnection(); conn.setDefaultUseCaches(Resource.getDefaultUseCaches()); - if (BUNDLE_URL_CONNECTION_getLocalURL == null && conn.getClass().getName().equals("org.eclipse.osgi.framework.internal.core.BundleURLConnection")) + if (BUNDLE_URL_CONNECTION_getLocalURL == null && match(conn.getClass().getName(), BUNDLE_URL_CONNECTION_CLASSES)) { BUNDLE_URL_CONNECTION_getLocalURL = conn.getClass().getMethod("getLocalURL", null); BUNDLE_URL_CONNECTION_getLocalURL.setAccessible(true); @@ -340,7 +359,9 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper URLConnection conn = url.openConnection(); conn.setDefaultUseCaches(Resource.getDefaultUseCaches()); - if (BUNDLE_URL_CONNECTION_getFileURL == null && conn.getClass().getName().equals("org.eclipse.osgi.framework.internal.core.BundleURLConnection")) + if (BUNDLE_URL_CONNECTION_getFileURL == null + && + match (conn.getClass().getName(), BUNDLE_URL_CONNECTION_CLASSES)) { BUNDLE_URL_CONNECTION_getFileURL = conn.getClass().getMethod("getFileURL", null); BUNDLE_URL_CONNECTION_getFileURL.setAccessible(true); diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java index bd19978803f..00f87cd498b 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java @@ -21,17 +21,20 @@ package org.eclipse.jetty.spdy.client.http; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.spdy.api.Session; public class HttpChannelOverSPDY extends HttpChannel { + private final HttpConnectionOverSPDY connection; private final Session session; private final HttpSenderOverSPDY sender; private final HttpReceiverOverSPDY receiver; - public HttpChannelOverSPDY(HttpDestination destination, Session session) + public HttpChannelOverSPDY(HttpDestination destination, HttpConnectionOverSPDY connection, Session session) { super(destination); + this.connection = connection; this.session = session; this.sender = new HttpSenderOverSPDY(this); this.receiver = new HttpReceiverOverSPDY(this); @@ -72,4 +75,11 @@ public class HttpChannelOverSPDY extends HttpChannel sender.abort(cause); return receiver.abort(cause); } + + @Override + public void exchangeTerminated(Result result) + { + super.exchangeTerminated(result); + connection.release(this); + } } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java index ef96e4158eb..57ba0652e28 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java @@ -18,6 +18,9 @@ package org.eclipse.jetty.spdy.client.http; +import java.nio.channels.AsynchronousCloseException; +import java.util.Set; + import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -25,9 +28,11 @@ import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ConcurrentHashSet; public class HttpConnectionOverSPDY extends HttpConnection { + private final Set channels = new ConcurrentHashSet<>(); private final Session session; public HttpConnectionOverSPDY(HttpDestination destination, Session session) @@ -41,14 +46,35 @@ public class HttpConnectionOverSPDY extends HttpConnection { normalizeRequest(exchange.getRequest()); // One connection maps to N channels, so for each exchange we create a new channel - HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), session); + HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session); + channels.add(channel); channel.associate(exchange); channel.send(); } + protected void release(HttpChannel channel) + { + channels.remove(channel); + } + @Override public void close() { + // First close then abort, to be sure that the connection cannot be reused + // from an onFailure() handler or by blocking code waiting for completion. + getHttpDestination().close(this); session.goAway(new GoAwayInfo(), new Callback.Adapter()); + abort(new AsynchronousCloseException()); + } + + private void abort(Throwable failure) + { + for (HttpChannel channel : channels) + { + HttpExchange exchange = channel.getHttpExchange(); + if (exchange != null) + exchange.getRequest().abort(failure); + } + channels.clear(); } } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java index dff5a0785b3..bdc4c05b5f1 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java @@ -35,12 +35,4 @@ public class HttpDestinationOverSPDY extends MultiplexHttpDestination