Fix #5605 Unblock non container Threads

revert fillInterest cancellation and just abort connection instead.
tested for all transports
This commit is contained in:
gregw 2021-02-03 17:27:25 +01:00
parent 5f4919c45a
commit e9315fe51f
7 changed files with 199 additions and 195 deletions

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -363,12 +362,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_fillInterest.register(callback);
}
@Override
public Throwable cancelFillInterest(Supplier<Throwable> cancellation)
{
return _fillInterest.cancel(cancellation);
}
@Override
public boolean tryFillInterested(Callback callback)
{

View File

@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import java.util.function.Supplier;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
@ -224,8 +223,6 @@ public interface EndPoint extends Closeable
*/
boolean isFillInterested();
Throwable cancelFillInterest(Supplier<Throwable> cancellation);
/**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p>

View File

@ -21,9 +21,7 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadPendingException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -44,38 +42,6 @@ public abstract class FillInterest
{
}
/**
* Cancel a fill interest registration.
*
* If there was a registration, then any {@link #fillable()}, {@link #onClose()} or {@link #onFail(Throwable)}
* calls are remembered and passed to the next registration.
* Since any actions resulting from a call to {@link #needsFillInterest()} cannot be unwound, a subsequent call to
* register will not call {@link #needsFillInterest()} again if it has already been called an no callback received.
* @param cancellation A supplier of the cancellation Throwable to use if there is an existing registration. If the
* suppler or the supplied Throwable is null, then a new {@link CancellationException} is used.
* @return The Throwable used to cancel an existing registration or null if there was no registration to cancel.
*/
public Throwable cancel(Supplier<Throwable> cancellation)
{
Cancelled cancelled = new Cancelled();
while (true)
{
Callback callback = _interested.get();
if (callback == null || callback instanceof Cancelled)
return null;
if (_interested.compareAndSet(callback, cancelled))
{
Throwable cause = cancellation == null ? null : cancellation.get();
if (cause == null)
cause = new CancellationException();
if (LOG.isDebugEnabled())
LOG.debug("cancelled {} {}",this, callback, cause);
callback.failed(cause);
return cause;
}
}
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
@ -102,63 +68,26 @@ public abstract class FillInterest
* @return true if the register succeeded
*/
public boolean tryRegister(Callback callback)
{
return register(callback, null);
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
* returns true or eventually once {@link #fillable()} is called.
*
* @param callback the callback to register
* @param cancellation A supplier of a {@link Throwable}, which if not null will be used to fail any existing registration
* @return true if the register succeeded
*/
public boolean register(Callback callback, Supplier<Throwable> cancellation)
{
if (callback == null)
throw new IllegalArgumentException();
while (true)
if (!_interested.compareAndSet(null, callback))
return false;
if (LOG.isDebugEnabled())
LOG.debug("interested {}", this);
try
{
Callback existing = _interested.get();
if (existing != null && !(existing instanceof Cancelled) && cancellation == null)
return false;
if (existing == callback)
return true;
if (_interested.compareAndSet(existing, callback))
{
if (LOG.isDebugEnabled())
LOG.debug("interested {}->{}", existing, this);
if (existing == null)
{
try
{
needsFillInterest();
}
catch (Throwable e)
{
onFail(e);
}
}
else if (existing instanceof Cancelled)
{
((Cancelled)existing).apply(callback);
}
else
{
Throwable cause = cancellation.get();
if (cause == null)
cause = new CancellationException();
existing.failed(cause);
}
return true;
}
needsFillInterest();
}
catch (Throwable e)
{
onFail(e);
}
return true;
}
/**
@ -168,19 +97,17 @@ public abstract class FillInterest
*/
public boolean fillable()
{
while (true)
if (LOG.isDebugEnabled())
LOG.debug("fillable {}", this);
Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null))
{
Callback callback = _interested.get();
if (callback == null)
return false;
if (_interested.compareAndSet(callback, null))
{
if (LOG.isDebugEnabled())
LOG.debug("fillable {} {}",this, callback);
callback.succeeded();
return true;
}
callback.succeeded();
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("{} lost race {}", this, callback);
return false;
}
/**
@ -188,8 +115,7 @@ public abstract class FillInterest
*/
public boolean isInterested()
{
Callback callback = _interested.get();
return callback != null && !(callback instanceof Cancelled);
return _interested.get() != null;
}
public InvocationType getCallbackInvocationType()
@ -206,37 +132,24 @@ public abstract class FillInterest
*/
public boolean onFail(Throwable cause)
{
while (true)
if (LOG.isDebugEnabled())
LOG.debug("onFail " + this, cause);
Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null))
{
Callback callback = _interested.get();
if (callback == null)
return false;
if (_interested.compareAndSet(callback, null))
{
if (LOG.isDebugEnabled())
LOG.debug("onFail {} {}",this, callback, cause);
callback.failed(cause);
return true;
}
callback.failed(cause);
return true;
}
return false;
}
public void onClose()
{
while (true)
{
Callback callback = _interested.get();
if (callback == null)
return;
if (_interested.compareAndSet(callback, null))
{
ClosedChannelException cause = new ClosedChannelException();
if (LOG.isDebugEnabled())
LOG.debug("onFail {} {}",this, callback, cause);
callback.failed(cause);
return;
}
}
if (LOG.isDebugEnabled())
LOG.debug("onClose {}", this);
Callback callback = _interested.get();
if (callback != null && _interested.compareAndSet(callback, null))
callback.failed(new ClosedChannelException());
}
@Override
@ -258,36 +171,4 @@ public abstract class FillInterest
* @throws IOException if unable to fulfill interest in fill
*/
protected abstract void needsFillInterest() throws IOException;
private static class Cancelled implements Callback
{
private final AtomicReference<Object> _result = new AtomicReference<>();
@Override
public void succeeded()
{
_result.compareAndSet(null, Boolean.TRUE);
}
@Override
public void failed(Throwable x)
{
_result.compareAndSet(null, x == null ? new Exception() : x);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
void apply(Callback callback)
{
Object result = _result.get();
if (result == Boolean.TRUE)
callback.succeeded();
else if (result instanceof Throwable)
callback.failed((Throwable)result);
}
}
}

View File

@ -382,30 +382,33 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError);
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection, cancelled);
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.recycle();
_parser.reset();
_generator.reset();
if (_contentBufferReferences.get() == 0)
releaseRequestBuffer();
if (isFillInterested())
abort(new IllegalStateException());
else
{
LOG.warn("{} lingering content references?!?!", this);
_requestBuffer = null; // Not returned to pool!
_contentBufferReferences.set(0);
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.recycle();
_parser.reset();
_generator.reset();
if (_contentBufferReferences.get() == 0)
releaseRequestBuffer();
else
{
LOG.warn("{} lingering content references?!?!", this);
_requestBuffer = null; // Not returned to pool!
_contentBufferReferences.set(0);
}
}
return;
}
}
boolean complete = _input.consumeAll();
Throwable cancelled = getEndPoint().cancelFillInterest(_input::getError);
if (LOG.isDebugEnabled())
LOG.debug("cancelled {}", this, cancelled);
if (isFillInterested())
abort(new IllegalStateException());
// Finish consuming the request
// If we are still expecting

View File

@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
@ -806,12 +805,6 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory
_endp.fillInterested(callback);
}
@Override
public Throwable cancelFillInterest(Supplier<Throwable> cancellation)
{
return _endp.cancelFillInterest(cancellation);
}
@Override
public boolean flush(ByteBuffer... buffer) throws IOException
{

View File

@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import java.util.function.Supplier;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -117,12 +116,6 @@ public class MockEndPoint implements EndPoint
throw new UnsupportedOperationException(NOT_SUPPORTED);
}
@Override
public Throwable cancelFillInterest(Supplier<Throwable> cancellation)
{
throw new UnsupportedOperationException(NOT_SUPPORTED);
}
@Override
public boolean tryFillInterested(Callback callback)
{

View File

@ -0,0 +1,144 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BlockedIOTest extends AbstractTest<TransportScenario>
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testBlockingReadThenNormalComplete(Transport transport) throws Exception
{
CountDownLatch started = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AtomicReference<Throwable> rereadException = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
}
catch (Throwable ex1)
{
readException.set(ex1);
try
{
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
catch (Throwable ex2)
{
rereadException.set(ex2);
}
finally
{
stopped.countDown();
}
}
}).start();
try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
// give it time to block on second byte
Thread.sleep(1000);
}
catch (Throwable e)
{
throw new ServletException(e);
}
response.setStatus(200);
response.setContentType("text/plain");
response.getOutputStream().print("OK\r\n");
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch ok = new CountDownLatch(2);
scenario.client.POST(scenario.newURI())
.content(contentProvider)
.onResponseContent((response, content) ->
{
assertThat(BufferUtil.toString(content), containsString("OK"));
ok.countDown();
})
.onResponseSuccess(response ->
{
try
{
assertThat(response.getStatus(), is(200));
stopped.await(10, TimeUnit.SECONDS);
ok.countDown();
}
catch (Throwable t)
{
t.printStackTrace();
}
})
.send(null);
contentProvider.offer(BufferUtil.toBuffer("1"));
assertTrue(ok.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));
assertThat(rereadException.get(), instanceOf(IOException.class));
}
}