Merged branch 'master' into 'jetty-9.2.x'.

This commit is contained in:
Simone Bordet 2014-04-07 17:51:23 +02:00
commit 2897027f53
17 changed files with 1020 additions and 709 deletions

View File

@ -129,6 +129,15 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
return true;
}
@Override
public void close()
{
super.close();
C connection = this.connection;
if (connection != null)
connection.close();
}
@Override
public void close(Connection connection)
{

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -85,13 +86,8 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
protected boolean onReadTimeout()
{
LOG.debug("{} idle timeout", this);
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
return exchange.getRequest().abort(new TimeoutException());
getHttpDestination().close(this);
return true;
close(new TimeoutException());
return false;
}
@Override
@ -119,14 +115,23 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
@Override
public void close()
{
close(new AsynchronousCloseException());
}
protected void close(Throwable failure)
{
if (softClose())
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
abort(failure);
}
}
@ -135,6 +140,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return closed.compareAndSet(false, true);
}
private boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
return exchange != null && exchange.getRequest().abort(failure);
}
@Override
public String toString()
{

View File

@ -127,8 +127,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
// Shutting down the parser may invoke messageComplete() or earlyEOF()
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
if (!responseFailure(new EOFException()))
getHttpConnection().close();
getHttpConnection().close(new EOFException());
}
@Override

View File

@ -1161,4 +1161,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(response.getHeaders().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString()));
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1,220 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
public class HttpReceiverTest
{
// @Rule
// public final TestTracker tracker = new TestTracker();
//
// private HttpClient client;
// private HttpDestination destination;
// private ByteArrayEndPoint endPoint;
// private HttpConnection connection;
// private HttpConversation conversation;
//
// @Before
// public void init() throws Exception
// {
// client = new HttpClient();
// client.start();
// destination = new HttpDestination(client, "http", "localhost", 8080);
// endPoint = new ByteArrayEndPoint();
// connection = new HttpConnection(client, endPoint, destination);
// conversation = new HttpConversation(client, 1);
// }
//
// @After
// public void destroy() throws Exception
// {
// client.stop();
// }
//
// protected HttpExchange newExchange()
// {
// HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
// FutureResponseListener listener = new FutureResponseListener(request);
// HttpExchange exchange = new HttpExchange(conversation, destination, request, Collections.<Response.ResponseListener>singletonList(listener));
// conversation.getExchanges().offer(exchange);
// connection.associate(exchange);
// exchange.requestComplete();
// exchange.terminateRequest();
// return exchange;
// }
//
// @Test
// public void test_Receive_NoResponseContent() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 0\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// Response response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertEquals("OK", response.getReason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
// HttpFields headers = response.getHeaders();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH));
// }
//
// @Test
// public void test_Receive_ResponseContent() throws Exception
// {
// String content = "0123456789ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + content.length() + "\r\n" +
// "\r\n" +
// content);
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// Response response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertEquals("OK", response.getReason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
// HttpFields headers = response.getHeaders();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
// String received = listener.getContentAsString(StandardCharsets.UTF_8);
// Assert.assertEquals(content, received);
// }
//
// @Test
// public void test_Receive_ResponseContent_EarlyEOF() throws Exception
// {
// String content1 = "0123456789";
// String content2 = "ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + (content1.length() + content2.length()) + "\r\n" +
// "\r\n" +
// content1);
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// endPoint.setInputEOF();
// connection.receive();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof EOFException);
// }
// }
//
// @Test
// public void test_Receive_ResponseContent_IdleTimeout() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 1\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// // Simulate an idle timeout
// connection.idleTimeout();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof TimeoutException);
// }
// }
//
// @Test
// public void test_Receive_BadResponse() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: A\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof HttpResponseException);
// }
// }
//
// @Test
// public void test_Receive_GZIPResponseContent_Fragmented() throws Exception
// {
// byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
// try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos))
// {
// gzipOutput.write(data);
// }
// byte[] gzip = baos.toByteArray();
//
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-Length: " + gzip.length + "\r\n" +
// "Content-Encoding: gzip\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// endPoint.reset();
//
// ByteBuffer buffer = ByteBuffer.wrap(gzip);
// int fragment = buffer.limit() - 1;
// buffer.limit(fragment);
// endPoint.setInput(buffer);
// connection.receive();
// endPoint.reset();
//
// buffer.limit(gzip.length);
// buffer.position(fragment);
// endPoint.setInput(buffer);
// connection.receive();
//
// ContentResponse response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertArrayEquals(data, response.getContent());
// }
}

View File

@ -1,280 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
public class HttpSenderTest
{
// @Rule
// public final TestTracker tracker = new TestTracker();
//
// private HttpClient client;
//
// @Before
// public void init() throws Exception
// {
// client = new HttpClient();
// client.start();
// }
//
// @After
// public void destroy() throws Exception
// {
// client.stop();
// }
//
// @Test
// public void test_Send_NoRequestContent() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Slow
// @Test
// public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// connection.send(request, (Response.CompleteListener)null);
//
// // This take will free space in the buffer and allow for the write to complete
// StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
//
// // Wait for the write to complete
// TimeUnit.SECONDS.sleep(1);
//
// String chunk = endPoint.takeOutputString();
// while (chunk.length() > 0)
// {
// builder.append(chunk);
// chunk = endPoint.takeOutputString();
// }
//
// String requestString = builder.toString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
// }
//
// @Test
// public void test_Send_NoRequestContent_Exception() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// // Shutdown output to trigger the exception on write
// endPoint.shutdownOutput();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch failureLatch = new CountDownLatch(2);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onFailure(Request request, Throwable x)
// {
// failureLatch.countDown();
// }
// });
// connection.send(request, new Response.Listener.Adapter()
// {
// @Override
// public void onComplete(Result result)
// {
// Assert.assertTrue(result.isFailed());
// failureLatch.countDown();
// }
// });
//
// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch failureLatch = new CountDownLatch(2);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onFailure(Request request, Throwable x)
// {
// failureLatch.countDown();
// }
// });
// connection.send(request, new Response.Listener.Adapter()
// {
// @Override
// public void onComplete(Result result)
// {
// Assert.assertTrue(result.isFailed());
// failureLatch.countDown();
// }
// });
//
// // Shutdown output to trigger the exception on write
// endPoint.shutdownOutput();
// // This take will free space in the buffer and allow for the write to complete
// // although it will fail because we shut down the output
// endPoint.takeOutputString();
//
// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content = "abcdef";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content1 = "0123456789";
// String content2 = "abcdef";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8))));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content1 = "0123456789";
// String content2 = "ABCDEF";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8)))
// {
// @Override
// public long getLength()
// {
// return -1;
// }
// });
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Adapter()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n";
// content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n";
// content += "0\r\n\r\n";
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
}

View File

@ -0,0 +1,246 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class HttpReceiverOverHTTPTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
private HttpDestinationOverHTTP destination;
private ByteArrayEndPoint endPoint;
private HttpConnectionOverHTTP connection;
@Before
public void init() throws Exception
{
client = new HttpClient();
client.start();
destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
endPoint = new ByteArrayEndPoint();
connection = new HttpConnectionOverHTTP(endPoint, destination);
}
@After
public void destroy() throws Exception
{
client.stop();
}
protected HttpExchange newExchange()
{
HttpRequest request = (HttpRequest)client.newRequest("http://localhost");
FutureResponseListener listener = new FutureResponseListener(request);
HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener));
connection.getHttpChannel().associate(exchange);
exchange.requestComplete();
exchange.terminateRequest(null);
return exchange;
}
@Test
public void test_Receive_NoResponseContent() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals("OK", response.getReason());
Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
HttpFields headers = response.getHeaders();
Assert.assertNotNull(headers);
Assert.assertEquals(1, headers.size());
Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH));
}
@Test
public void test_Receive_ResponseContent() throws Exception
{
String content = "0123456789ABCDEF";
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals("OK", response.getReason());
Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
HttpFields headers = response.getHeaders();
Assert.assertNotNull(headers);
Assert.assertEquals(1, headers.size());
Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
String received = listener.getContentAsString(StandardCharsets.UTF_8);
Assert.assertEquals(content, received);
}
@Test
public void test_Receive_ResponseContent_EarlyEOF() throws Exception
{
String content1 = "0123456789";
String content2 = "ABCDEF";
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
endPoint.setInputEOF();
connection.getHttpChannel().receive();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof EOFException);
}
}
@Test
public void test_Receive_ResponseContent_IdleTimeout() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
// Simulate an idle timeout
connection.onReadTimeout();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof TimeoutException);
}
}
@Test
public void test_Receive_BadResponse() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof HttpResponseException);
}
}
@Test
public void test_Receive_GZIPResponseContent_Fragmented() throws Exception
{
byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos))
{
gzipOutput.write(data);
}
byte[] gzip = baos.toByteArray();
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-Length: " + gzip.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
endPoint.reset();
ByteBuffer buffer = ByteBuffer.wrap(gzip);
int fragment = buffer.limit() - 1;
buffer.limit(fragment);
endPoint.setInput(buffer);
connection.getHttpChannel().receive();
endPoint.reset();
buffer.limit(gzip.length);
buffer.position(fragment);
endPoint.setInput(buffer);
connection.getHttpChannel().receive();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
}

View File

@ -0,0 +1,302 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class HttpSenderOverHTTPTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
@Before
public void init() throws Exception
{
client = new HttpClient();
client.start();
}
@After
public void destroy() throws Exception
{
client.stop();
}
@Test
public void test_Send_NoRequestContent() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
// This take will free space in the buffer and allow for the write to complete
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
TimeUnit.SECONDS.sleep(1);
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)
{
builder.append(chunk);
chunk = endPoint.takeOutputString();
}
String requestString = builder.toString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
}
@Test
public void test_Send_NoRequestContent_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
}
});
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
// This take will free space in the buffer and allow for the write to complete
// although it will fail because we shut down the output
endPoint.takeOutputString();
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap(content2.getBytes(StandardCharsets.UTF_8)))
{
@Override
public long getLength()
{
return -1;
}
});
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n";
content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n";
content += "0\r\n\r\n";
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -132,7 +132,7 @@ public class HttpChannelOverFCGI extends HttpChannel
if (close)
connection.close();
else
connection.release();
connection.release(this);
}
protected void flush(Generator.Result... results)
@ -155,7 +155,7 @@ public class HttpChannelOverFCGI extends HttpChannel
protected void onIdleExpired(TimeoutException timeout)
{
LOG.debug("Idle timeout for request {}", request);
abort(timeout);
connection.abort(timeout);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -139,25 +140,19 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private void shutdown()
{
// First close then abort, to be sure that the
// connection cannot be reused from an onFailure()
// handler or by blocking code waiting for completion.
close();
for (HttpChannelOverFCGI channel : channels.values())
channel.abort(new EOFException());
close(new EOFException());
}
@Override
protected boolean onReadTimeout()
{
for (HttpChannelOverFCGI channel : channels.values())
channel.abort(new TimeoutException());
close();
close(new TimeoutException());
return false;
}
public void release()
protected void release(HttpChannelOverFCGI channel)
{
channels.remove(channel.getRequest());
if (destination instanceof PoolingHttpDestination)
{
@SuppressWarnings("unchecked")
@ -169,17 +164,37 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
public void close()
{
close(new AsynchronousCloseException());
}
private void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
abort(failure);
}
}
protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : channels.values())
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
channels.clear();
}
private int acquireRequest()
{
synchronized (requests)
@ -304,7 +319,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
public void onEnd(int request)
{
HttpChannelOverFCGI channel = channels.remove(request);
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
channel.responseSuccess();

View File

@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -37,6 +38,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -513,4 +515,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(length, response.getContent().length);
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -37,37 +37,81 @@ import org.osgi.framework.Bundle;
public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
{
private static final Logger LOG = Log.getLogger(BundleClassLoaderHelper.class);
private static enum OSGiContainerType {EquinoxOld, EquinoxLuna, FelixOld, Felix403};
private static OSGiContainerType osgiContainer;
private static Class Equinox_BundleHost_Class;
private static Class Equinox_EquinoxBundle_Class;
private static Class Felix_BundleImpl_Class;
private static Class Felix_BundleWiring_Class;
//old equinox
private static Method Equinox_BundleHost_getBundleLoader_method;
private static Method Equinox_BundleLoader_createClassLoader_method;
//new equinox
private static Method Equinox_EquinoxBundle_getModuleClassLoader_Method;
private static boolean identifiedOsgiImpl = false;
//new felix
private static Method Felix_BundleImpl_Adapt_Method;
//old felix
private static Field Felix_BundleImpl_m_Modules_Field;
private static Field Felix_ModuleImpl_m_ClassLoader_Field;
private static Method Felix_BundleWiring_getClassLoader_Method;
private static boolean isEquinox = false;
private static boolean isFelix = false;
private static void init(Bundle bundle)
private static void checkContainerType (Bundle bundle)
{
identifiedOsgiImpl = true;
if (osgiContainer != null)
return;
try
{
isEquinox = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost") != null;
Equinox_BundleHost_Class = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost");
osgiContainer = OSGiContainerType.EquinoxOld;
return;
}
catch (Throwable t)
catch (ClassNotFoundException e)
{
isEquinox = false;
LOG.ignore(e);
}
if (!isEquinox)
{
try
{
isFelix = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl") != null;
Equinox_EquinoxBundle_Class = bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.internal.framework.EquinoxBundle");
osgiContainer = OSGiContainerType.EquinoxLuna;
return;
}
catch (Throwable t2)
catch (ClassNotFoundException e)
{
isFelix = false;
LOG.ignore(e);
}
try
{
//old felix or new felix?
Felix_BundleImpl_Class = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl");
try
{
Felix_BundleImpl_Adapt_Method = Felix_BundleImpl_Class.getDeclaredMethod("adapt", new Class[] {Class.class});
osgiContainer = OSGiContainerType.Felix403;
return;
}
catch (NoSuchMethodException e)
{
osgiContainer = OSGiContainerType.FelixOld;
return;
}
}
catch (ClassNotFoundException e)
{
LOG.warn("Unknown OSGi container type");
return;
}
}
/**
* Assuming the bundle is started.
*
@ -93,37 +137,62 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
LOG.warn(e);
}
}
// resort to introspection
if (!identifiedOsgiImpl)
{
init(bundle);
}
if (isEquinox)
{
return internalGetEquinoxBundleClassLoader(bundle);
}
else if (isFelix)
{
return internalGetFelixBundleClassLoader(bundle);
return getBundleClassLoaderForContainer(bundle);
}
LOG.warn("No classloader found for bundle "+bundle.getSymbolicName());
/**
* @param bundle
* @return
*/
private ClassLoader getBundleClassLoaderForContainer (Bundle bundle)
{
checkContainerType (bundle);
if (osgiContainer == null)
{
LOG.warn("No classloader for unknown OSGi container type");
return null;
}
private static Method Equinox_BundleHost_getBundleLoader_method;
switch (osgiContainer)
{
case EquinoxOld:
case EquinoxLuna:
{
return internalGetEquinoxBundleClassLoader(bundle);
}
private static Method Equinox_BundleLoader_createClassLoader_method;
case FelixOld:
case Felix403:
{
return internalGetFelixBundleClassLoader(bundle);
}
default:
{
LOG.warn("No classloader found for bundle "+bundle.getSymbolicName());
return null;
}
}
}
/**
* @param bundle
* @return
*/
private static ClassLoader internalGetEquinoxBundleClassLoader(Bundle bundle)
{
// assume equinox:
if (osgiContainer == OSGiContainerType.EquinoxOld)
{
try
{
if (Equinox_BundleHost_getBundleLoader_method == null)
{
Equinox_BundleHost_getBundleLoader_method =
bundle.getClass().getClassLoader().loadClass("org.eclipse.osgi.framework.internal.core.BundleHost").getDeclaredMethod("getBundleLoader", new Class[] {});
Equinox_BundleHost_Class.getDeclaredMethod("getBundleLoader", new Class[] {});
Equinox_BundleHost_getBundleLoader_method.setAccessible(true);
}
Object bundleLoader = Equinox_BundleHost_getBundleLoader_method.invoke(bundle, new Object[] {});
@ -135,38 +204,68 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
}
return (ClassLoader) Equinox_BundleLoader_createClassLoader_method.invoke(bundleLoader, new Object[] {});
}
catch (ClassNotFoundException t)
{
LOG.warn(t);
return null;
}
catch (Throwable t)
{
LOG.warn(t);
return null;
}
}
if (osgiContainer == OSGiContainerType.EquinoxLuna)
{
try
{
if (Equinox_EquinoxBundle_getModuleClassLoader_Method == null)
Equinox_EquinoxBundle_getModuleClassLoader_Method = Equinox_EquinoxBundle_Class.getDeclaredMethod("getModuleClassLoader", new Class[] {Boolean.TYPE});
Equinox_EquinoxBundle_getModuleClassLoader_Method.setAccessible(true);
return (ClassLoader)Equinox_EquinoxBundle_getModuleClassLoader_Method.invoke(bundle, new Object[] {Boolean.FALSE});
}
catch (Exception e)
{
LOG.warn(e);
return null;
}
}
LOG.warn("No classloader for equinox platform for bundle "+bundle.getSymbolicName());
return null;
}
private static Field Felix_BundleImpl_m_modules_field;
private static Field Felix_ModuleImpl_m_classLoader_field;
private static Method Felix_adapt_method;
private static Method Felix_bundle_wiring_getClassLoader_method;
private static Class Felix_bundleWiringClazz;
private static Boolean isFelix403 = null;
/**
* @param bundle
* @return
*/
private static ClassLoader internalGetFelixBundleClassLoader(Bundle bundle)
{
//firstly, try to find classes matching a newer version of felix
initFelix403(bundle);
if (isFelix403.booleanValue())
if (osgiContainer == OSGiContainerType.Felix403)
{
try
{
Object wiring = Felix_adapt_method.invoke(bundle, new Object[] {Felix_bundleWiringClazz});
ClassLoader cl = (ClassLoader)Felix_bundle_wiring_getClassLoader_method.invoke(wiring);
return cl;
if (Felix_BundleWiring_Class == null)
Felix_BundleWiring_Class = bundle.getClass().getClassLoader().loadClass("org.osgi.framework.wiring.BundleWiring");
Felix_BundleImpl_Adapt_Method.setAccessible(true);
if (Felix_BundleWiring_getClassLoader_Method == null)
{
Felix_BundleWiring_getClassLoader_Method = Felix_BundleWiring_Class.getDeclaredMethod("getClassLoader");
Felix_BundleWiring_getClassLoader_Method.setAccessible(true);
}
Object wiring = Felix_BundleImpl_Adapt_Method.invoke(bundle, new Object[] {Felix_BundleWiring_Class});
return (ClassLoader)Felix_BundleWiring_getClassLoader_Method.invoke(wiring);
}
catch (Exception e)
{
@ -176,37 +275,29 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
}
// Fallback to trying earlier versions of felix.
if (Felix_BundleImpl_m_modules_field == null)
if (osgiContainer == OSGiContainerType.FelixOld)
{
try
{
Class bundleImplClazz = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl");
Felix_BundleImpl_m_modules_field = bundleImplClazz.getDeclaredField("m_modules");
Felix_BundleImpl_m_modules_field.setAccessible(true);
}
catch (ClassNotFoundException e)
if (Felix_BundleImpl_m_Modules_Field == null)
{
LOG.warn(e);
}
catch (NoSuchFieldException e)
{
LOG.warn(e);
}
Felix_BundleImpl_m_Modules_Field = Felix_BundleImpl_Class.getDeclaredField("m_modules");
Felix_BundleImpl_m_Modules_Field.setAccessible(true);
}
// Figure out which version of the modules is exported
Object currentModuleImpl;
try
{
Object[] moduleArray = (Object[]) Felix_BundleImpl_m_modules_field.get(bundle);
Object[] moduleArray = (Object[]) Felix_BundleImpl_m_Modules_Field.get(bundle);
currentModuleImpl = moduleArray[moduleArray.length - 1];
}
catch (Throwable t2)
{
try
{
List<Object> moduleArray = (List<Object>) Felix_BundleImpl_m_modules_field.get(bundle);
List<Object> moduleArray = (List<Object>) Felix_BundleImpl_m_Modules_Field.get(bundle);
currentModuleImpl = moduleArray.get(moduleArray.size() - 1);
}
catch (Exception e)
@ -216,31 +307,27 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
}
}
if (Felix_ModuleImpl_m_classLoader_field == null && currentModuleImpl != null)
if (Felix_ModuleImpl_m_ClassLoader_Field == null && currentModuleImpl != null)
{
try
{
Felix_ModuleImpl_m_classLoader_field = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.ModuleImpl").getDeclaredField("m_classLoader");
Felix_ModuleImpl_m_classLoader_field.setAccessible(true);
Felix_ModuleImpl_m_ClassLoader_Field = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.ModuleImpl").getDeclaredField("m_classLoader");
Felix_ModuleImpl_m_ClassLoader_Field.setAccessible(true);
}
catch (ClassNotFoundException e)
{
LOG.warn(e);
return null;
}
catch (NoSuchFieldException e)
catch (Exception e)
{
LOG.warn(e);
return null;
}
}
// first make sure that the classloader is ready:
// the m_classLoader field must be initialized by the
// ModuleImpl.getClassLoader() private method.
ClassLoader cl = null;
try
{
cl = (ClassLoader) Felix_ModuleImpl_m_classLoader_field.get(currentModuleImpl);
cl = (ClassLoader) Felix_ModuleImpl_m_ClassLoader_Field.get(currentModuleImpl);
if (cl != null)
return cl;
}
@ -257,7 +344,7 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
try
{
bundle.loadClass("java.lang.Object");
cl = (ClassLoader) Felix_ModuleImpl_m_classLoader_field.get(currentModuleImpl);
cl = (ClassLoader) Felix_ModuleImpl_m_ClassLoader_Field.get(currentModuleImpl);
return cl;
}
catch (Exception e)
@ -266,33 +353,14 @@ public class DefaultBundleClassLoaderHelper implements BundleClassLoaderHelper
return null;
}
}
catch (Exception e)
{
LOG.warn(e);
return null;
}
}
private static void initFelix403 (Bundle bundle)
{
//see if the version of Felix is a new one
if (isFelix403 == null)
{
try
{
Class bundleImplClazz = bundle.getClass().getClassLoader().loadClass("org.apache.felix.framework.BundleImpl");
Felix_bundleWiringClazz = bundle.getClass().getClassLoader().loadClass("org.osgi.framework.wiring.BundleWiring");
Felix_adapt_method = bundleImplClazz.getDeclaredMethod("adapt", new Class[] {Class.class});
Felix_adapt_method.setAccessible(true);
Felix_bundle_wiring_getClassLoader_method = Felix_bundleWiringClazz.getDeclaredMethod("getClassLoader");
Felix_bundle_wiring_getClassLoader_method.setAccessible(true);
isFelix403 = Boolean.TRUE;
}
catch (ClassNotFoundException e)
{
LOG.warn("Felix 4.x classes not found in environment");
isFelix403 = Boolean.FALSE;
}
catch (NoSuchMethodException e)
{
LOG.warn("Felix 4.x classes not found in environment");
isFelix403 = Boolean.FALSE;
}
}
LOG.warn("No classloader for felix platform for bundle "+bundle.getSymbolicName());
return null;
}
}

View File

@ -64,6 +64,24 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
private static Field ZIP_FILE_FILED_FOR_ZIP_BUNDLE_FILE = null;// ZipFile
private static final String[] FILE_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.FileBundleEntry","org.eclipse.osgi.storage.bundlefile.FileBundleEntry"};
private static final String[] ZIP_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.ZipBundleEntry","org.eclipse.osgi.storage.bundlefile.ZipBundleEntry"};
private static final String[] DIR_ZIP_BUNDLE_ENTRY_CLASSES = {"org.eclipse.osgi.baseadaptor.bundlefile.DirZipBundleEntry","org.eclipse.osgi.storage.bundlefile.DirZipBundleEntry"};
private static final String[] BUNDLE_URL_CONNECTION_CLASSES = {"org.eclipse.osgi.framework.internal.core.BundleURLConnection", "org.eclipse.osgi.storage.url.BundleURLConnection"};
public static boolean match (String name, String... names)
{
if (name == null || names == null)
return false;
boolean matched = false;
for (int i=0; i< names.length && !matched; i++)
if (name.equals(names[i]))
matched = true;
return matched;
}
/**
* Works with equinox, felix, nuxeo and probably more. Not exactly in the
* spirit of OSGi but quite necessary to support self-contained webapps and
@ -107,7 +125,8 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
BUNDLE_ENTRY_FIELD.setAccessible(true);
}
Object bundleEntry = BUNDLE_ENTRY_FIELD.get(con);
if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.FileBundleEntry"))
if (match(bundleEntry.getClass().getName(), FILE_BUNDLE_ENTRY_CLASSES))
{
if (FILE_FIELD == null)
{
@ -117,7 +136,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
File f = (File) FILE_FIELD.get(bundleEntry);
return f.getParentFile().getParentFile();
}
else if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.ZipBundleEntry"))
else if (match(bundleEntry.getClass().getName(), ZIP_BUNDLE_ENTRY_CLASSES))
{
url = bundle.getEntry("/");
@ -144,7 +163,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
ZipFile zipFile = (ZipFile) ZIP_FILE_FILED_FOR_ZIP_BUNDLE_FILE.get(zipBundleFile);
return new File(zipFile.getName());
}
else if (bundleEntry.getClass().getName().equals("org.eclipse.osgi.baseadaptor.bundlefile.DirZipBundleEntry"))
else if (match (bundleEntry.getClass().getName(), DIR_ZIP_BUNDLE_ENTRY_CLASSES))
{
// that will not happen as we did ask for the manifest not a
// directory.
@ -309,7 +328,7 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
URLConnection conn = url.openConnection();
conn.setDefaultUseCaches(Resource.getDefaultUseCaches());
if (BUNDLE_URL_CONNECTION_getLocalURL == null && conn.getClass().getName().equals("org.eclipse.osgi.framework.internal.core.BundleURLConnection"))
if (BUNDLE_URL_CONNECTION_getLocalURL == null && match(conn.getClass().getName(), BUNDLE_URL_CONNECTION_CLASSES))
{
BUNDLE_URL_CONNECTION_getLocalURL = conn.getClass().getMethod("getLocalURL", null);
BUNDLE_URL_CONNECTION_getLocalURL.setAccessible(true);
@ -340,7 +359,9 @@ public class DefaultFileLocatorHelper implements BundleFileLocatorHelper
URLConnection conn = url.openConnection();
conn.setDefaultUseCaches(Resource.getDefaultUseCaches());
if (BUNDLE_URL_CONNECTION_getFileURL == null && conn.getClass().getName().equals("org.eclipse.osgi.framework.internal.core.BundleURLConnection"))
if (BUNDLE_URL_CONNECTION_getFileURL == null
&&
match (conn.getClass().getName(), BUNDLE_URL_CONNECTION_CLASSES))
{
BUNDLE_URL_CONNECTION_getFileURL = conn.getClass().getMethod("getFileURL", null);
BUNDLE_URL_CONNECTION_getFileURL.setAccessible(true);

View File

@ -21,17 +21,20 @@ package org.eclipse.jetty.spdy.client.http;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.spdy.api.Session;
public class HttpChannelOverSPDY extends HttpChannel
{
private final HttpConnectionOverSPDY connection;
private final Session session;
private final HttpSenderOverSPDY sender;
private final HttpReceiverOverSPDY receiver;
public HttpChannelOverSPDY(HttpDestination destination, Session session)
public HttpChannelOverSPDY(HttpDestination destination, HttpConnectionOverSPDY connection, Session session)
{
super(destination);
this.connection = connection;
this.session = session;
this.sender = new HttpSenderOverSPDY(this);
this.receiver = new HttpReceiverOverSPDY(this);
@ -72,4 +75,11 @@ public class HttpChannelOverSPDY extends HttpChannel
sender.abort(cause);
return receiver.abort(cause);
}
@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
connection.release(this);
}
}

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.spdy.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Set;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@ -25,9 +28,11 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentHashSet;
public class HttpConnectionOverSPDY extends HttpConnection
{
private final Set<HttpChannel> channels = new ConcurrentHashSet<>();
private final Session session;
public HttpConnectionOverSPDY(HttpDestination destination, Session session)
@ -41,14 +46,35 @@ public class HttpConnectionOverSPDY extends HttpConnection
{
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), session);
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session);
channels.add(channel);
channel.associate(exchange);
channel.send();
}
protected void release(HttpChannel channel)
{
channels.remove(channel);
}
@Override
public void close()
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.goAway(new GoAwayInfo(), new Callback.Adapter());
abort(new AsynchronousCloseException());
}
private void abort(Throwable failure)
{
for (HttpChannel channel : channels)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
channels.clear();
}
}

View File

@ -35,12 +35,4 @@ public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnec
{
connection.send(exchange);
}
@Override
public void abort(Throwable cause)
{
// TODO: in case of connection failure, we need to abort also
// TODO: all pending exchanges, so we need to track them.
super.abort(cause);
}
}

View File

@ -23,6 +23,7 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -36,6 +37,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -426,4 +428,40 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(length, response.getContent().length);
}
@Test
public void testLongPollIsAbortedWhenClientIsStopped() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
request.startAsync();
latch.countDown();
}
});
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Stop the client, the complete listener must be invoked.
client.stop();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
}