Merged branch 'jetty-9.4.x' into 'master'.
This commit is contained in:
commit
b9f25d78ce
|
@ -23,6 +23,11 @@ import java.io.InputStream;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -76,11 +81,11 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
private final CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
private final CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
private final AtomicReference<InputStream> stream = new AtomicReference<>();
|
||||
private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
|
||||
private Response response;
|
||||
private Result result;
|
||||
private Throwable failure;
|
||||
private boolean closed;
|
||||
private DeferredContentProvider.Chunk chunk;
|
||||
|
||||
public InputStreamResponseListener()
|
||||
{
|
||||
|
@ -113,7 +118,9 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
closed = this.closed;
|
||||
if (!closed)
|
||||
{
|
||||
chunk = new DeferredContentProvider.Chunk(content, callback);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queueing content {}", content);
|
||||
chunks.add(new DeferredContentProvider.Chunk(content, callback));
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
@ -131,7 +138,8 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
{
|
||||
synchronized (lock)
|
||||
{
|
||||
chunk = EOF;
|
||||
if (!closed)
|
||||
chunks.add(EOF);
|
||||
lock.notifyAll();
|
||||
}
|
||||
|
||||
|
@ -142,37 +150,34 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
Callback callback = null;
|
||||
List<Callback> callbacks;
|
||||
synchronized (lock)
|
||||
{
|
||||
if (this.failure != null)
|
||||
return;
|
||||
this.failure = failure;
|
||||
if (chunk != null)
|
||||
callback = chunk.callback;
|
||||
callbacks = drain();
|
||||
lock.notifyAll();
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Content failure", failure);
|
||||
|
||||
if (callback != null)
|
||||
callback.failed(failure);
|
||||
callbacks.forEach(callback -> callback.failed(failure));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Throwable failure = result.getFailure();
|
||||
Callback callback = null;
|
||||
List<Callback> callbacks = Collections.emptyList();
|
||||
synchronized (lock)
|
||||
{
|
||||
this.result = result;
|
||||
if (result.isFailed() && this.failure == null)
|
||||
{
|
||||
this.failure = failure;
|
||||
if (chunk != null)
|
||||
callback = chunk.callback;
|
||||
callbacks = drain();
|
||||
}
|
||||
// Notify the response latch in case of request failures.
|
||||
responseLatch.countDown();
|
||||
|
@ -188,8 +193,7 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
LOG.debug("Result failure", failure);
|
||||
}
|
||||
|
||||
if (callback != null)
|
||||
callback.failed(failure);
|
||||
callbacks.forEach(callback -> callback.failed(failure));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,6 +261,23 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
return IO.getClosedStream();
|
||||
}
|
||||
|
||||
private List<Callback> drain()
|
||||
{
|
||||
List<Callback> callbacks = new ArrayList<>();
|
||||
synchronized (lock)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
DeferredContentProvider.Chunk chunk = chunks.peek();
|
||||
if (chunk == null || chunk == EOF)
|
||||
break;
|
||||
callbacks.add(chunk.callback);
|
||||
chunks.poll();
|
||||
}
|
||||
}
|
||||
return callbacks;
|
||||
}
|
||||
|
||||
private class Input extends InputStream
|
||||
{
|
||||
@Override
|
||||
|
@ -278,16 +299,22 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
Callback callback = null;
|
||||
synchronized (lock)
|
||||
{
|
||||
DeferredContentProvider.Chunk chunk;
|
||||
while (true)
|
||||
{
|
||||
if (failure != null)
|
||||
throw toIOException(failure);
|
||||
chunk = chunks.peek();
|
||||
if (chunk == EOF)
|
||||
return -1;
|
||||
if (closed)
|
||||
throw new AsynchronousCloseException();
|
||||
|
||||
if (chunk != null)
|
||||
break;
|
||||
|
||||
if (failure != null)
|
||||
throw toIOException(failure);
|
||||
|
||||
if (closed)
|
||||
throw new AsynchronousCloseException();
|
||||
|
||||
lock.wait();
|
||||
}
|
||||
|
||||
|
@ -297,7 +324,7 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
if (!buffer.hasRemaining())
|
||||
{
|
||||
callback = chunk.callback;
|
||||
chunk = null;
|
||||
chunks.poll();
|
||||
}
|
||||
}
|
||||
if (callback != null)
|
||||
|
@ -321,22 +348,21 @@ public class InputStreamResponseListener extends Listener.Adapter
|
|||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
Callback callback = null;
|
||||
List<Callback> callbacks;
|
||||
synchronized (lock)
|
||||
{
|
||||
if (closed)
|
||||
return;
|
||||
closed = true;
|
||||
if (chunk != null)
|
||||
callback = chunk.callback;
|
||||
callbacks = drain();
|
||||
lock.notifyAll();
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("InputStream close");
|
||||
|
||||
if (callback != null)
|
||||
callback.failed(new AsynchronousCloseException());
|
||||
Throwable failure = new AsynchronousCloseException();
|
||||
callbacks.forEach(callback -> callback.failed(failure));
|
||||
|
||||
super.close();
|
||||
}
|
||||
|
|
|
@ -139,13 +139,13 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
|
|||
{
|
||||
int oldSize = session.updateRecvWindow(-length);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
|
||||
LOG.debug("Data received, {} bytes, updated session recv window {} -> {} for {}", length, oldSize, oldSize - length, session);
|
||||
|
||||
if (stream != null)
|
||||
{
|
||||
oldSize = stream.updateRecvWindow(-length);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
|
||||
LOG.debug("Data received, {} bytes, updated stream recv window {} -> {} for {}", length, oldSize, oldSize - length, stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -111,13 +111,13 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
level = sessionLevel.getAndSet(0);
|
||||
session.updateRecvWindow(level);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, updated session recv window by {}/{} for {}", level, maxLevel, session);
|
||||
LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session);
|
||||
windowFrame = new WindowUpdateFrame(0, level);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
|
||||
LOG.debug("Data consumed, {} bytes, session recv window level {}/{} for {}", length, level, maxLevel, session);
|
||||
}
|
||||
|
||||
Frame[] windowFrames = Frame.EMPTY_ARRAY;
|
||||
|
@ -126,7 +126,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
if (stream.isClosed())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
|
||||
LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for closed {}", length, stream);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -140,7 +140,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
level = streamLevel.getAndSet(0);
|
||||
stream.updateRecvWindow(level);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, updated stream recv window by {}/{} for {}", level, maxLevel, stream);
|
||||
LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream);
|
||||
WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
|
||||
if (windowFrame == null)
|
||||
windowFrame = frame;
|
||||
|
@ -150,7 +150,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, stream);
|
||||
LOG.debug("Data consumed, {} bytes, stream recv window level {}/{} for {}", length, level, maxLevel, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
|
||||
public class EmptyServerHandler extends AbstractHandler
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
}
|
|
@ -18,15 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
|
@ -44,8 +39,6 @@ import org.eclipse.jetty.http2.client.http.HttpChannelOverHTTP2;
|
|||
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
|
||||
import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.Assert;
|
||||
|
@ -61,14 +54,7 @@ public class HttpChannelAssociationTest extends AbstractTest
|
|||
@Test
|
||||
public void testAssociationFailedAbortsRequest() throws Exception
|
||||
{
|
||||
startServer(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
client = new HttpClient(newHttpClientTransport(transport, exchange -> false), sslContextFactory);
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
|
@ -90,14 +76,7 @@ public class HttpChannelAssociationTest extends AbstractTest
|
|||
@Test
|
||||
public void testIdleTimeoutJustBeforeAssociation() throws Exception
|
||||
{
|
||||
startServer(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
long idleTimeout = 1000;
|
||||
client = new HttpClient(newHttpClientTransport(transport, exchange ->
|
||||
|
|
|
@ -116,14 +116,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
|
|||
@Test
|
||||
public void testIdleClientIdleTimeout() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
start(new EmptyServerHandler());
|
||||
client.stop();
|
||||
client.setIdleTimeout(idleTimeout);
|
||||
client.start();
|
||||
|
@ -143,14 +136,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
|
|||
@Test
|
||||
public void testIdleServerIdleTimeout() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
start(new EmptyServerHandler());
|
||||
connector.setIdleTimeout(idleTimeout);
|
||||
|
||||
ContentResponse response1 = client.newRequest(newURI()).send();
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -65,15 +65,15 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
||||
public class HttpClientStreamTest extends AbstractTest
|
||||
{
|
||||
public HttpClientStreamTest(SslContextFactory sslContextFactory)
|
||||
public HttpClientStreamTest(Transport transport)
|
||||
{
|
||||
super(sslContextFactory);
|
||||
super(transport);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -90,11 +90,30 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
output.write(kb);
|
||||
}
|
||||
|
||||
start(new RespondThenConsumeHandler());
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setStatus(200);
|
||||
response.setContentLength(0);
|
||||
response.flushBuffer();
|
||||
|
||||
InputStream in = request.getInputStream();
|
||||
byte[] buffer = new byte[1024];
|
||||
while (true)
|
||||
{
|
||||
int read = in.read(buffer);
|
||||
if (read < 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
final AtomicLong requestTime = new AtomicLong();
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.file(upload)
|
||||
.onRequestSuccess(request -> requestTime.set(System.nanoTime()))
|
||||
.timeout(30, TimeUnit.SECONDS)
|
||||
|
@ -127,7 +146,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -168,7 +187,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -215,7 +234,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -241,7 +260,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
// Expected.
|
||||
}
|
||||
|
||||
Assert.assertEquals(data.length, length);
|
||||
Assert.assertThat(length, Matchers.lessThanOrEqualTo(data.length));
|
||||
|
||||
Result result = listener.await(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(result);
|
||||
|
@ -267,7 +286,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
stream.close();
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(new BytesContentProvider(new byte[]{0, 1, 2, 3}))
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
|
@ -309,7 +328,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
};
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
|
@ -366,7 +385,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
};
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
@ -418,7 +437,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
};
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
@ -443,7 +462,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
// Connect to the wrong port
|
||||
client.newRequest("localhost", port)
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Result result = listener.await(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(result);
|
||||
|
@ -464,7 +483,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
final byte[] data = new byte[]{0, 1, 2, 3};
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(new InputStreamContentProvider(new InputStream()
|
||||
{
|
||||
private int index = 0;
|
||||
|
@ -510,7 +529,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -555,7 +574,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -591,7 +610,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.send(listener);
|
||||
Response response = listener.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -630,7 +649,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
try (DeferredContentProvider content = new DeferredContentProvider())
|
||||
{
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(result ->
|
||||
{
|
||||
|
@ -680,7 +699,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
});
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(result ->
|
||||
{
|
||||
|
@ -720,7 +739,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
};
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
|
@ -784,7 +803,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
{
|
||||
byte[] chunk = iteratorData[index.getAndIncrement()];
|
||||
ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk);
|
||||
if (index.get() == 2)
|
||||
if (index.get() < iteratorData.length)
|
||||
{
|
||||
contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result);
|
||||
contentRef.get().close();
|
||||
|
@ -802,7 +821,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
contentRef.set(content);
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
|
@ -836,7 +855,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
OutputStreamContentProvider content = new OutputStreamContentProvider();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
|
@ -879,7 +898,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
OutputStreamContentProvider content = new OutputStreamContentProvider();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(new BufferingResponseListener(data.length)
|
||||
{
|
||||
|
@ -920,7 +939,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
OutputStreamContentProvider content = new OutputStreamContentProvider();
|
||||
client.newRequest("0.0.0.1", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(result ->
|
||||
{
|
||||
|
@ -959,7 +978,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
final CountDownLatch completeLatch = new CountDownLatch(1);
|
||||
final DeferredContentProvider content = new DeferredContentProvider();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.onRequestBegin(request ->
|
||||
{
|
||||
|
@ -1007,7 +1026,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
final CountDownLatch completeLatch = new CountDownLatch(1);
|
||||
client.newRequest("0.0.0.1", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(content)
|
||||
.send(result ->
|
||||
{
|
||||
|
@ -1079,7 +1098,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
final CountDownLatch completeLatch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.content(provider)
|
||||
.onRequestCommit(request -> commit.set(true))
|
||||
.send(result ->
|
||||
|
@ -1110,7 +1129,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
|||
|
||||
InputStreamResponseListener listener = new InputStreamResponseListener();
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.scheme(getScheme())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(listener);
|
||||
|
|
@ -305,14 +305,7 @@ public class HttpClientTest extends AbstractTest
|
|||
// Only run this test for transports over TLS.
|
||||
Assume.assumeTrue(EnumSet.of(Transport.HTTPS, Transport.H2).contains(transport));
|
||||
|
||||
startServer(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
// Use a default SslContextFactory, requests should fail because the server certificate is unknown.
|
||||
client = newHttpClient(provideClientTransport(transport), new SslContextFactory());
|
||||
|
@ -415,14 +408,7 @@ public class HttpClientTest extends AbstractTest
|
|||
@Test
|
||||
public void testConnectionListener() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
start(new EmptyServerHandler());
|
||||
|
||||
CountDownLatch openLatch = new CountDownLatch(1);
|
||||
CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
|
|
Loading…
Reference in New Issue