Fixes #10145 - WritePendingException over HTTP/2 tunnel (#10146)

Method HTTP2StreamEndPoint.flush() has a "no pending operation" semantic, but the previous implementation was calling stream.data(), which may become a pending operation if the stream is congested.

Changed the implementation of flush() to return false in the IDLE and PENDING cases.
Now every flush() is converted to a write(), which has the same semantic as stream.data().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-07-26 22:59:24 +02:00 committed by GitHub
parent e7a088f3f0
commit a5a0a6c887
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 99 deletions

View File

@ -109,7 +109,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
case OSHUTTING: case OSHUTTING:
if (!writeState.compareAndSet(current, WriteState.OSHUT)) if (!writeState.compareAndSet(current, WriteState.OSHUT))
break; break;
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.from(this::oshutSuccess, this::oshutFailure)); Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure);
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback);
return; return;
case PENDING: case PENDING:
if (!writeState.compareAndSet(current, WriteState.OSHUTTING)) if (!writeState.compareAndSet(current, WriteState.OSHUTTING))
@ -177,7 +178,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (closed.compareAndSet(false, true)) if (closed.compareAndSet(false, true))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("closing {}, cause: {}", this, cause); LOG.debug("closing {}", this, cause);
shutdownOutput(); shutdownOutput();
stream.close(); stream.close();
onClose(cause); onClose(cause);
@ -188,7 +189,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
public int fill(ByteBuffer sink) throws IOException public int fill(ByteBuffer sink) throws IOException
{ {
Entry entry; Entry entry;
try (AutoLock l = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
entry = dataQueue.poll(); entry = dataQueue.poll();
} }
@ -222,7 +223,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (source.hasRemaining()) if (source.hasRemaining())
{ {
try (AutoLock l = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
dataQueue.offerFirst(entry); dataQueue.offerFirst(entry);
} }
@ -248,92 +249,34 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this); LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this);
if (buffers == null || buffers.length == 0) if (buffers == null || buffers.length == 0 || remaining(buffers) == 0)
{
return true; return true;
}
else
{
while (true)
{
WriteState current = writeState.get();
switch (current.state)
{
case IDLE:
if (!writeState.compareAndSet(current, WriteState.PENDING))
break;
// We must copy the buffers because, differently from
// write(), the semantic of flush() is that it does not
// own them, but stream.data() needs to own them.
ByteBuffer buffer = coalesce(buffers, true);
Callback.Completable callback = new Callback.Completable(Invocable.InvocationType.NON_BLOCKING);
stream.data(new DataFrame(stream.getId(), buffer, false), callback);
callback.whenComplete((nothing, failure) ->
{
if (failure == null)
flushSuccess();
else
flushFailure(failure);
});
return callback.isDone();
case PENDING:
return false;
case OSHUTTING:
case OSHUT:
throw new EofException("Output shutdown");
case FAILED:
Throwable failure = current.failure;
if (failure instanceof IOException)
throw (IOException)failure;
throw new IOException(failure);
}
}
}
}
private void flushSuccess() // Differently from other EndPoint implementations, where write() calls flush(),
{ // in this implementation all the work is done in write(), and flush() is mostly
while (true) // a no-operation.
{ // This is because the flush() semantic is that it must not leave pending
WriteState current = writeState.get(); // operations if it cannot write the buffers; therefore we cannot call
switch (current.state) // stream.data() from flush() because if the stream is congested, the buffers
{ // would not be fully written, we would return false from flush(), but
case IDLE: // stream.data() would remain as a pending operation.
case OSHUT:
throw new IllegalStateException();
case PENDING:
if (!writeState.compareAndSet(current, WriteState.IDLE))
break;
return;
case OSHUTTING:
shutdownOutput();
return;
case FAILED:
return;
}
}
}
private void flushFailure(Throwable failure) WriteState current = writeState.get();
{ switch (current.state)
while (true)
{ {
WriteState current = writeState.get(); case IDLE:
switch (current.state) case PENDING:
{ return false;
case IDLE: case OSHUTTING:
case OSHUT: case OSHUT:
throw new IllegalStateException(); throw new EofException("Output shutdown");
case PENDING: case FAILED:
if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure))) Throwable failure = current.failure;
break; if (failure instanceof IOException)
return; throw (IOException)failure;
case OSHUTTING: throw new IOException(failure);
shutdownOutput(); default:
return; throw new IllegalStateException("Unexpected state: " + current.state);
case FAILED:
return;
}
} }
} }
@ -397,8 +340,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (!writeState.compareAndSet(current, WriteState.PENDING)) if (!writeState.compareAndSet(current, WriteState.PENDING))
break; break;
// TODO: we really need a Stream primitive to write multiple frames. // TODO: we really need a Stream primitive to write multiple frames.
ByteBuffer result = coalesce(buffers, false); ByteBuffer result = coalesce(buffers);
stream.data(new DataFrame(stream.getId(), result, false), Callback.from(() -> writeSuccess(callback), x -> writeFailure(x, callback))); Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback));
stream.data(new DataFrame(stream.getId(), result, false), dataCallback);
return; return;
case PENDING: case PENDING:
callback.failed(new WritePendingException()); callback.failed(new WritePendingException());
@ -410,6 +354,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
case FAILED: case FAILED:
callback.failed(current.failure); callback.failed(current.failure);
return; return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
} }
} }
} }
@ -438,6 +385,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
case FAILED: case FAILED:
callback.failed(current.failure); callback.failed(current.failure);
return; return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
} }
} }
} }
@ -461,23 +411,21 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
return; return;
case FAILED: case FAILED:
return; return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
} }
} }
} }
private long remaining(ByteBuffer... buffers) private long remaining(ByteBuffer... buffers)
{ {
long total = 0; return BufferUtil.remaining(buffers);
for (ByteBuffer buffer : buffers)
{
total += buffer.remaining();
}
return total;
} }
private ByteBuffer coalesce(ByteBuffer[] buffers, boolean forceCopy) private ByteBuffer coalesce(ByteBuffer[] buffers)
{ {
if (buffers.length == 1 && !forceCopy) if (buffers.length == 1)
return buffers[0]; return buffers[0];
long capacity = remaining(buffers); long capacity = remaining(buffers);
if (capacity > Integer.MAX_VALUE) if (capacity > Integer.MAX_VALUE)
@ -567,7 +515,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
private void offer(ByteBuffer buffer, Callback callback, Throwable failure) private void offer(ByteBuffer buffer, Callback callback, Throwable failure)
{ {
try (AutoLock l = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
dataQueue.offer(new Entry(buffer, callback, failure)); dataQueue.offer(new Entry(buffer, callback, failure));
} }
@ -576,7 +524,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
protected void process() protected void process()
{ {
boolean empty; boolean empty;
try (AutoLock l = lock.lock()) try (AutoLock ignored = lock.lock())
{ {
empty = dataQueue.isEmpty(); empty = dataQueue.isEmpty();
} }

View File

@ -277,7 +277,7 @@ public abstract class WriteFlusher
if (buffers != null) if (buffers != null)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("flushed incomplete"); LOG.debug("flush incomplete {}", this);
PendingState pending = new PendingState(callback, address, buffers); PendingState pending = new PendingState(callback, address, buffers);
if (updateState(__WRITING, pending)) if (updateState(__WRITING, pending))
onIncompleteFlush(); onIncompleteFlush();

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http.client; package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -23,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -36,7 +38,9 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory; import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.client.util.ByteBufferRequestContent;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
@ -73,6 +77,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -288,6 +293,58 @@ public class ProxyWithDynamicTransportTest
assertEquals(1, connectionPool.getConnectionCount()); assertEquals(1, connectionPool.getConnectionCount());
} }
@ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}")
@MethodSource("testParams")
public void testProxyConcurrentLoad(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception
{
start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
int parallelism = 8;
boolean proxyMultiplexed = proxyProtocol.getProtocols().stream().allMatch(p -> p.startsWith("h2"));
client.setMaxConnectionsPerDestination(proxyMultiplexed ? 1 : parallelism);
int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol);
client.getProxyConfiguration().addProxy(proxy);
String scheme = serverSecure ? "https" : "http";
int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort();
int contentLength = 128 * 1024;
int iterations = 16;
IntStream.range(0, parallelism).parallel().forEach(p ->
IntStream.range(0, iterations).forEach(i ->
{
try
{
String id = p + "-" + i;
ContentResponse response = client.newRequest("localhost", serverPort)
.scheme(scheme)
.method(HttpMethod.POST)
.path("/path/" + id)
.version(serverProtocol)
.body(new ByteBufferRequestContent(ByteBuffer.allocate(contentLength)))
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(contentLength, response.getContent().length);
}
catch (Throwable x)
{
throw new RuntimeException(x);
}
}));
}
@Test @Test
public void testHTTP2TunnelClosedByClient() throws Exception public void testHTTP2TunnelClosedByClient() throws Exception
{ {