Merge branch 'jetty-12.0.x' into fix/jetty-12/reset-no-error

This commit is contained in:
gregw 2023-12-15 14:31:15 +11:00
commit 5d58eff13d
37 changed files with 582 additions and 92 deletions

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.client;
import java.io.Closeable;
import java.net.SocketAddress;
import org.eclipse.jetty.util.Promise;
@ -46,4 +47,20 @@ public interface Connection extends Closeable
* @see #close()
*/
boolean isClosed();
/**
* @return the local socket address associated with the connection
*/
default SocketAddress getLocalSocketAddress()
{
return null;
}
/**
* @return the remote socket address associated with the connection
*/
default SocketAddress getRemoteSocketAddress()
{
return null;
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Map;
@ -276,6 +277,18 @@ public class HttpProxy extends ProxyConfiguration.Proxy
this.promise = promise;
}
@Override
public SocketAddress getLocalSocketAddress()
{
return connection.getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return connection.getRemoteSocketAddress();
}
@Override
public void send(Request request, Response.CompleteListener listener)
{

View File

@ -48,6 +48,22 @@ import org.eclipse.jetty.util.Fields;
*/
public interface Request
{
/**
* <p>Returns the connection associated with this request.</p>
* <p>The connection is available only starting from the
* {@link #onRequestBegin(BeginListener) request begin} event,
* when a connection is associated with the request to be sent,
* otherwise {@code null} is returned.</p>
*
* @return the connection associated with this request,
* or {@code null} if there is no connection associated
* with this request
*/
default Connection getConnection()
{
return null;
}
/**
* @return the URI scheme of this request, such as "http" or "https"
*/

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.client.transport;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Promise;
@ -54,7 +55,7 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
boolean result = false;
boolean abort = true;
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_exchange == null)
{
@ -65,12 +66,14 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
}
}
HttpRequest request = exchange.getRequest();
if (abort)
{
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
request.abort(new UnsupportedOperationException("Pipelined requests not supported"));
}
else
{
request.setConnection(getConnection());
if (LOG.isDebugEnabled())
LOG.debug("{} associated {} to {}", exchange, result, this);
}
@ -87,7 +90,7 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
public boolean disassociate(HttpExchange exchange)
{
boolean result = false;
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
HttpExchange existing = _exchange;
_exchange = null;
@ -113,12 +116,14 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
*/
public HttpExchange getHttpExchange()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
return _exchange;
}
}
protected abstract Connection getConnection();
@Override
public long getExpireNanoTime()
{

View File

@ -40,6 +40,7 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.HttpClient;
@ -70,6 +71,7 @@ public class HttpRequest implements Request
private final AtomicReference<Throwable> aborted = new AtomicReference<>();
private final HttpClient client;
private final HttpConversation conversation;
private Connection connection;
private String scheme;
private String host;
private int port;
@ -162,6 +164,17 @@ public class HttpRequest implements Request
return conversation;
}
@Override
public Connection getConnection()
{
return connection;
}
void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public String getScheme()
{

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.client.transport.internal;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.transport.HttpChannel;
@ -55,6 +56,12 @@ public class HttpChannelOverHTTP extends HttpChannel
return new HttpReceiverOverHTTP(this);
}
@Override
protected Connection getConnection()
{
return connection;
}
@Override
protected HttpSenderOverHTTP getHttpSender()
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.client.transport.internal;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
@ -100,6 +101,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
return delegate.getHttpDestination();
}
@Override
public SocketAddress getLocalSocketAddress()
{
return delegate.getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return delegate.getRemoteSocketAddress();
}
@Override
public long getBytesIn()
{
@ -288,6 +301,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
return Collections.<HttpChannel>singleton(channel).iterator();
}
@Override
public SocketAddress getLocalSocketAddress()
{
return getEndPoint().getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return getEndPoint().getRemoteSocketAddress();
}
@Override
public SendFailure send(HttpExchange exchange)
{

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
@ -67,26 +68,16 @@ public class HttpClientFailureTest
{
startServer(new EmptyServerHandler());
AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpConnectionOverHTTP connection = (HttpConnectionOverHTTP)super.newConnection(endPoint, context);
connectionRef.set(connection);
return connection;
}
});
client = new HttpClient(new HttpClientTransportOverHTTP(1));
client.start();
assertThrows(ExecutionException.class, () ->
client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(request -> connectionRef.get().getEndPoint().close())
.timeout(5, TimeUnit.SECONDS)
.send());
Request request = client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(r -> r.getConnection().close())
.timeout(5, TimeUnit.SECONDS);
assertThrows(ExecutionException.class, request::send);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.fcgi.client.transport.internal;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.transport.HttpChannel;
import org.eclipse.jetty.client.transport.HttpExchange;
@ -57,6 +58,12 @@ public class HttpChannelOverFCGI extends HttpChannel
this.request = request;
}
@Override
protected Connection getConnection()
{
return connection;
}
@Override
protected HttpSender getHttpSender()
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.fcgi.client.transport.internal;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
@ -88,6 +89,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return destination;
}
@Override
public SocketAddress getLocalSocketAddress()
{
return delegate.getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return delegate.getRemoteSocketAddress();
}
protected Flusher getFlusher()
{
return flusher;
@ -334,6 +347,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return Collections.<HttpChannel>singleton(channel).iterator();
}
@Override
public SocketAddress getLocalSocketAddress()
{
return getEndPoint().getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return getEndPoint().getRemoteSocketAddress();
}
@Override
public SendFailure send(HttpExchange exchange)
{

View File

@ -317,6 +317,21 @@ public class MultiPartFormData
this.filesDirectory = filesDirectory;
}
private Path findFilesDirectory()
{
Path dir = getFilesDirectory();
if (dir != null)
return dir;
String jettyBase = System.getProperty("jetty.base");
if (jettyBase != null)
{
dir = Path.of(jettyBase).resolve("work");
if (Files.exists(dir))
return dir;
}
throw new IllegalArgumentException("No files directory configured");
}
/**
* @return the maximum file size in bytes, or -1 for unlimited file size
*/
@ -627,7 +642,7 @@ public class MultiPartFormData
{
try (AutoLock ignored = lock.lock())
{
Path directory = getFilesDirectory();
Path directory = findFilesDirectory();
Files.createDirectories(directory);
String fileName = "MultiPart";
filePath = Files.createTempFile(directory, fileName, "");

View File

@ -912,6 +912,28 @@ public class MultiPartFormDataTest
assertThat(chunk.getFailure(), instanceOf(NumberFormatException.class));
}
@Test
public void testMissingFilesDirectory()
{
AsyncContent source = new TestContent();
MultiPartFormData.Parser formData = new MultiPartFormData.Parser("AaB03x");
// Always save to disk.
formData.setMaxMemoryFileSize(0);
String body = """
--AaB03x\r
Content-Disposition: form-data; name="file1"; filename="file.txt"\r
Content-Type: text/plain\r
\r
ABCDEFGHIJKLMNOPQRSTUVWXYZ\r
--AaB03x--\r
""";
Content.Sink.write(source, true, body, Callback.NOOP);
Throwable cause = assertThrows(ExecutionException.class, () -> formData.parse(source).get(5, TimeUnit.SECONDS)).getCause();
assertInstanceOf(IllegalArgumentException.class, cause);
}
private class TestContent extends AsyncContent
{
@Override

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.http2.client.transport.internal;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.transport.HttpChannel;
import org.eclipse.jetty.client.transport.HttpExchange;
@ -68,6 +69,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return listener;
}
@Override
protected Connection getConnection()
{
return connection;
}
@Override
protected HttpSender getHttpSender()
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http2.client.transport.internal;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.Map;
@ -69,6 +70,18 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return session;
}
@Override
public SocketAddress getLocalSocketAddress()
{
return session.getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return session.getRemoteSocketAddress();
}
public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;

View File

@ -0,0 +1,132 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.tests;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DynamicTableTest extends AbstractTest
{
@ParameterizedTest
@CsvSource({"0,-1", "-1,0", "0,0"})
public void testMaxEncoderTableCapacityZero(int clientMaxCapacity, int serverMaxCapacity) throws Exception
{
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
if (clientMaxCapacity >= 0)
http2Client.setMaxEncoderTableCapacity(0);
if (serverMaxCapacity >= 0)
connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class).setMaxEncoderTableCapacity(serverMaxCapacity);
CountDownLatch serverPreface = new CountDownLatch(1);
Session session = newClientSession(new Session.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
serverPreface.countDown();
}
});
assertTrue(serverPreface.await(5, TimeUnit.SECONDS));
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(200, response.getStatus());
latch.countDown();
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@CsvSource({"0,-1", "-1,0", "0,0"})
public void testMaxDecoderTableCapacityZero(int clientMaxCapacity, int serverMaxCapacity) throws Exception
{
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
callback.succeeded();
return true;
}
});
if (clientMaxCapacity >= 0)
http2Client.setMaxDecoderTableCapacity(0);
if (serverMaxCapacity >= 0)
connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class).setMaxDecoderTableCapacity(serverMaxCapacity);
CountDownLatch serverPreface = new CountDownLatch(1);
Session session = newClientSession(new Session.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
serverPreface.countDown();
}
});
assertTrue(serverPreface.await(5, TimeUnit.SECONDS));
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(200, response.getStatus());
latch.countDown();
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.client.transport.internal;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.transport.HttpChannel;
import org.eclipse.jetty.client.transport.HttpExchange;
@ -54,6 +55,12 @@ public class HttpChannelOverHTTP3 extends HttpChannel
return receiver;
}
@Override
protected Connection getConnection()
{
return connection;
}
@Override
protected HttpSender getHttpSender()
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.client.transport.internal;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.Set;
@ -51,6 +52,18 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
return session;
}
@Override
public SocketAddress getLocalSocketAddress()
{
return session.getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteSocketAddress()
{
return session.getRemoteSocketAddress();
}
@Override
public int getMaxMultiplex()
{

View File

@ -212,10 +212,6 @@ public interface Request extends Attributes, Content.Source
/**
* {@inheritDoc}
* @param demandCallback the demand callback to invoke when there is a content chunk available.
* In addition to the invocation guarantees of {@link Content.Source#demand(Runnable)},
* this implementation serializes the invocation of the {@code Runnable} with
* invocations of any {@link Response#write(boolean, ByteBuffer, Callback)}
* {@code Callback} invocations.
* @see Content.Source#demand(Runnable)
*/
@Override

View File

@ -152,9 +152,6 @@ public interface Response extends Content.Sink
* has returned.</p>
* <p>Thus a {@code Callback} should not block waiting for a callback
* of a future call to this method.</p>
* <p>Furthermore, the invocation of the passed callback is serialized
* with invocations of the {@link Runnable} demand callback passed to
* {@link Request#demand(Runnable)}.</p>
*
* @param last whether the ByteBuffer is the last to write
* @param byteBuffer the ByteBuffer to write

View File

@ -99,7 +99,8 @@ public class HttpChannelState implements HttpChannel, Components
private final AutoLock _lock = new AutoLock();
private final HandlerInvoker _handlerInvoker = new HandlerInvoker();
private final ConnectionMetaData _connectionMetaData;
private final SerializedInvoker _serializedInvoker;
private final SerializedInvoker _readInvoker;
private final SerializedInvoker _writeInvoker;
private final ResponseHttpFields _responseHeaders = new ResponseHttpFields();
private Thread _handling;
private boolean _handled;
@ -122,7 +123,8 @@ public class HttpChannelState implements HttpChannel, Components
{
_connectionMetaData = connectionMetaData;
// The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc.
_serializedInvoker = new HttpChannelSerializedInvoker();
_readInvoker = new HttpChannelSerializedInvoker();
_writeInvoker = new HttpChannelSerializedInvoker();
}
@Override
@ -298,7 +300,7 @@ public class HttpChannelState implements HttpChannel, Components
onContent = _onContentAvailable;
_onContentAvailable = null;
}
return _serializedInvoker.offer(onContent);
return _readInvoker.offer(onContent);
}
@Override
@ -341,13 +343,13 @@ public class HttpChannelState implements HttpChannel, Components
// If there was a pending IO operation, deliver the idle timeout via them.
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
return _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure);
return Invocable.combine(_readInvoker.offer(invokeOnContentAvailable), _writeInvoker.offer(invokeWriteFailure));
// Otherwise, if there are idle timeout listeners, ask them whether we should call onFailure.
Predicate<TimeoutException> onIdleTimeout = _onIdleTimeout;
if (onIdleTimeout != null)
{
return _serializedInvoker.offer(() ->
return () ->
{
if (onIdleTimeout.test(t))
{
@ -356,7 +358,7 @@ public class HttpChannelState implements HttpChannel, Components
if (task != null)
task.run();
}
});
};
}
}
@ -426,7 +428,7 @@ public class HttpChannelState implements HttpChannel, Components
};
// Serialize all the error actions.
task = _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure, invokeOnFailureListeners);
task = Invocable.combine(_readInvoker.offer(invokeOnContentAvailable), _writeInvoker.offer(invokeWriteFailure), invokeOnFailureListeners);
}
}
@ -912,7 +914,7 @@ public class HttpChannelState implements HttpChannel, Components
if (error)
{
httpChannelState._serializedInvoker.run(demandCallback);
httpChannelState._readInvoker.run(demandCallback);
}
else if (interimCallback == null)
{
@ -1189,14 +1191,14 @@ public class HttpChannelState implements HttpChannel, Components
if (writeFailure == NOTHING_TO_SEND)
{
httpChannelState._serializedInvoker.run(callback::succeeded);
httpChannelState._writeInvoker.run(callback::succeeded);
return;
}
// Have we failed in some way?
if (writeFailure != null)
{
Throwable failure = writeFailure;
httpChannelState._serializedInvoker.run(() -> callback.failed(failure));
httpChannelState._writeInvoker.run(() -> callback.failed(failure));
return;
}
@ -1235,7 +1237,7 @@ public class HttpChannelState implements HttpChannel, Components
httpChannel.lockedStreamSendCompleted(true);
}
if (callback != null)
httpChannel._serializedInvoker.run(callback::succeeded);
httpChannel._writeInvoker.run(callback::succeeded);
}
/**
@ -1263,7 +1265,7 @@ public class HttpChannelState implements HttpChannel, Components
httpChannel.lockedStreamSendCompleted(false);
}
if (callback != null)
httpChannel._serializedInvoker.run(() -> callback.failed(x));
httpChannel._writeInvoker.run(() -> callback.failed(x));
}
@Override

View File

@ -1220,15 +1220,15 @@ public class HttpChannelTest
assertThat(chunk.getFailure(), sameInstance(failure));
CountDownLatch demand = new CountDownLatch(1);
// Demand callback serialized until after onFailure listeners.
// Demand callback not serialized until after onFailure listeners.
rq.demand(demand::countDown);
assertThat(demand.getCount(), is(1L));
assertThat(demand.getCount(), is(0L));
FuturePromise<Throwable> callback = new FuturePromise<>();
// Write callback serialized until after onFailure listeners.
// Write callback not serialized until after onFailure listeners.
handling.get().write(false, null, Callback.from(() ->
{}, callback::succeeded));
assertFalse(callback.isDone());
assertTrue(callback.isDone());
// Process onFailure task.
try (StacklessLogging ignore = new StacklessLogging(Response.class))

View File

@ -194,7 +194,7 @@ public class RequestListenersTest
int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500;
assertEquals(expectedStatus, response.getStatus());
assertThat(failureLatch.await(1, TimeUnit.SECONDS), is(failIdleTimeout));
assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(failIdleTimeout && !succeedCallback));
}
@ParameterizedTest

View File

@ -1018,6 +1018,23 @@ public class HttpClientTest extends AbstractTest
assertThat(listener.result.getFailure().getMessage(), is("Synthetic Failure"));
}
@ParameterizedTest
@MethodSource("transports")
public void testRequestConnection(Transport transport) throws Exception
{
start(transport, new EmptyServerHandler());
ContentResponse response = client.newRequest(newURI(transport))
.onRequestBegin(r ->
{
if (r.getConnection() == null)
r.abort(new IllegalStateException());
})
.send();
assertEquals(200, response.getStatus());
}
private static void sleep(long time) throws IOException
{
try

View File

@ -358,12 +358,14 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
@Override
public String toString()
{
return String.format("%s@%x[inUse=%d,size=%d,max=%d,terminated=%b]",
return String.format("%s@%x[strategy=%s,inUse=%d,size=%d,max=%d,leaked=%d,terminated=%b]",
getClass().getSimpleName(),
hashCode(),
strategyType,
getInUseCount(),
size(),
getMaxSize(),
getLeaked(),
isTerminated());
}
@ -492,7 +494,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
{
boolean enabled = state.compareAndSet(0, 0, -1, acquire ? 1 : 0);
if (enabled && !acquire)
getHolder().hold();
getHolder().released();
return enabled;
}
@ -527,7 +529,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
if (state.compareAndSet(encoded, 0, newMultiplexCount))
{
if (newMultiplexCount == 1)
getHolder().free();
getHolder().acquired();
return true;
}
}
@ -554,7 +556,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
if (state.compareAndSet(encoded, 0, newMultiplexCount))
{
if (newMultiplexCount == 0)
getHolder().hold();
getHolder().released();
return true;
}
}
@ -653,14 +655,14 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
/**
* <p>Holds a strong and a weak reference to an {@link Entry} to avoid holding
* on to entries that are not released, so that they can be garbage collected.</p>
* <p>Methods {@link #hold()} and {@link #free()} work together to clear the
* <p>Methods {@link #released()} and {@link #acquired()} work together to clear the
* strong reference when the entry is acquired, and assign it when the entry
* is released.</p>
* <p>This class handles a race condition happening when an entry is being
* released with multiplex count going {@code 1 -> 0} by one thread that
* has not yet called {@link #hold()}, and immediately acquired by another
* thread that is calling {@link #free()}.
* The call to {@link #free()} spin loops until {@link #hold()} returns.</p>
* has not yet called {@link #released()}, and immediately acquired by another
* thread that is calling {@link #acquired()}.
* The call to {@link #acquired()} spin loops until {@link #released()} returns.</p>
*
* @param <P>
*/
@ -682,7 +684,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
/**
* <p>Called when an entry is released to the pool with multiplex count going from {@code 1} to {@code 0}.</p>
*/
public void hold()
public void released()
{
_strong = _weak.get();
}
@ -690,7 +692,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
/**
* <p>Called when an entry is acquired from the pool with multiplex count going from {@code 0} to {@code 1}.</p>
*/
public void free()
public void acquired()
{
ConcurrentEntry<P> entry = _weak.get();
if (entry == null)
@ -705,7 +707,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
@Override
public String toString()
{
return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _strong);
return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _strong == null ? "acquired" : "released", _weak.get());
}
}
}

View File

@ -40,11 +40,11 @@ public class PathResource extends Resource
{
private static final Logger LOG = LoggerFactory.getLogger(PathResource.class);
public static Index<String> SUPPORTED_SCHEMES = new Index.Builder<String>()
.caseSensitive(false)
.with("file")
.with("jrt")
.build();
/**
* @deprecated Using ResourceFactoryInternals.isSupported() instead.
*/
@Deprecated(since = "12.0.4", forRemoval = true)
public static Index<String> SUPPORTED_SCHEMES = new Index.Builder<String>().build();
// The path object represented by this instance
private final Path path;
@ -164,7 +164,7 @@ public class PathResource extends Resource
{
if (!uri.isAbsolute())
throw new IllegalArgumentException("not an absolute uri: " + uri);
if (!bypassAllowedSchemeCheck && !SUPPORTED_SCHEMES.contains(uri.getScheme()))
if (!bypassAllowedSchemeCheck && !ResourceFactoryInternals.isSupported(uri.getScheme()))
throw new IllegalArgumentException("not an allowed scheme: " + uri);
if (Files.isDirectory(path))

View File

@ -125,7 +125,7 @@ public interface Invocable
* @param task the Runnable
* @return a new Task
*/
public static Task from(InvocationType type, Runnable task)
static Task from(InvocationType type, Runnable task)
{
return new ReadyTask(type, task);
}
@ -202,4 +202,43 @@ public interface Invocable
{
return InvocationType.BLOCKING;
}
/**
* Combine {@link Runnable}s into a single {@link Runnable} that sequentially calls the others.
* @param runnables the {@link Runnable}s to combine
* @return the combined {@link Runnable} with a combined {@link InvocationType}.
*/
static Runnable combine(Runnable... runnables)
{
Runnable result = null;
for (Runnable runnable : runnables)
{
if (runnable == null)
continue;
if (result == null)
{
result = runnable;
}
else
{
Runnable first = result;
result = new Task()
{
@Override
public void run()
{
first.run();
runnable.run();
}
@Override
public InvocationType getInvocationType()
{
return combine(Invocable.getInvocationType(first), Invocable.getInvocationType(runnable));
}
};
}
}
return result;
}
}

View File

@ -0,0 +1,83 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.thread.Invocable.InvocationType.BLOCKING;
import static org.eclipse.jetty.util.thread.Invocable.InvocationType.EITHER;
import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class InvocableTest
{
@Test
public void testCombineType()
{
assertThat(Invocable.combine(null, null), is(BLOCKING));
assertThat(Invocable.combine(null, BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(null, NON_BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(null, EITHER), is(BLOCKING));
assertThat(Invocable.combine(BLOCKING, null), is(BLOCKING));
assertThat(Invocable.combine(BLOCKING, BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(BLOCKING, NON_BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(BLOCKING, EITHER), is(BLOCKING));
assertThat(Invocable.combine(NON_BLOCKING, null), is(BLOCKING));
assertThat(Invocable.combine(NON_BLOCKING, BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(NON_BLOCKING, NON_BLOCKING), is(NON_BLOCKING));
assertThat(Invocable.combine(NON_BLOCKING, EITHER), is(NON_BLOCKING));
assertThat(Invocable.combine(EITHER, null), is(BLOCKING));
assertThat(Invocable.combine(EITHER, BLOCKING), is(BLOCKING));
assertThat(Invocable.combine(EITHER, NON_BLOCKING), is(NON_BLOCKING));
assertThat(Invocable.combine(EITHER, EITHER), is(EITHER));
}
@Test
public void testCombineRunnable()
{
Queue<String> history = new ConcurrentLinkedQueue<>();
assertThat(Invocable.combine(), nullValue());
assertThat(Invocable.combine((Runnable)null), nullValue());
assertThat(Invocable.combine(null, (Runnable)null), nullValue());
Runnable r1 = () -> history.add("R1");
Runnable r2 = () -> history.add("R2");
Runnable r3 = () -> history.add("R3");
assertThat(Invocable.combine(r1, null, null), sameInstance(r1));
assertThat(Invocable.combine(null, r2, null), sameInstance(r2));
assertThat(Invocable.combine(null, null, r3), sameInstance(r3));
Runnable r13 = Invocable.combine(r1, null, r3);
history.clear();
r13.run();
assertThat(history, contains("R1", "R3"));
Runnable r123 = Invocable.combine(r1, r2, r3);
history.clear();
r123.run();
assertThat(history, contains("R1", "R2", "R3"));
}
}

View File

@ -478,6 +478,13 @@ public class ServletChannel
// be dispatched to an error page, so we delegate this responsibility to the ErrorHandler.
reopen();
_state.errorHandling();
// TODO We currently directly call the errorHandler here, but this is not correct in the case of async errors,
// because since a failure has already occurred, the errorHandler is unable to write a response.
// Instead, we should fail the callback, so that it calls Response.writeError(...) with an ErrorResponse
// that ignores existing failures. However, the error handler needs to be able to call servlet pages,
// so it will need to do a new call to associate(req,res,callback) or similar, to make the servlet request and
// response wrap the error request and response. Have to think about what callback is passed.
errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(_state::errorHandlingComplete));
}
}

View File

@ -151,6 +151,7 @@ public class ServletChannelState
private long _timeoutMs = DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
private Thread _onTimeoutThread;
private boolean _failureListener;
protected ServletChannelState(ServletChannel servletChannel)
{
@ -511,6 +512,11 @@ public class ServletChannelState
if (_state != State.HANDLING || (_requestState != RequestState.BLOCKING && _requestState != RequestState.ERRORING))
throw new IllegalStateException(this.getStatusStringLocked());
if (!_failureListener)
{
_failureListener = true;
_servletChannel.getRequest().addFailureListener(this::asyncError);
}
_requestState = RequestState.ASYNC;
_event = event;
lastAsyncListeners = _asyncListeners;
@ -1099,6 +1105,7 @@ public class ServletChannelState
_asyncWritePossible = false;
_timeoutMs = DEFAULT_TIMEOUT;
_event = null;
_failureListener = false;
}
}

View File

@ -83,6 +83,7 @@ public class ServletTest
@Test
public void testSimpleIdleIgnored() throws Exception
{
long idleTimeout = 1000;
_context.addServlet(new HttpServlet()
{
@Override
@ -90,7 +91,7 @@ public class ServletTest
{
try
{
Thread.sleep(1000);
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
@ -100,13 +101,13 @@ public class ServletTest
}
}, "/get");
_connector.setIdleTimeout(250);
_connector.setIdleTimeout(idleTimeout);
_server.start();
String response = _connector.getResponse("""
GET /ctx/get HTTP/1.0
""", 5, TimeUnit.SECONDS);
""", 5 * idleTimeout, TimeUnit.MILLISECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
}
@ -114,6 +115,7 @@ public class ServletTest
@Test
public void testSimpleIdleRead() throws Exception
{
long idleTimeout = 1000;
_context.addServlet(new HttpServlet()
{
@Override
@ -124,7 +126,7 @@ public class ServletTest
}
}, "/post");
_connector.setIdleTimeout(250);
_connector.setIdleTimeout(idleTimeout);
_server.start();
try (LocalConnector.LocalEndPoint endPoint = _connector.connect())
@ -148,7 +150,7 @@ public class ServletTest
assertThat(response, containsString("Hello 1234567890"));
endPoint.addInputAndExecute(request);
response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
response = endPoint.getResponse(false, 2 * idleTimeout, TimeUnit.MILLISECONDS);
assertThat(response, containsString(" 500 "));
assertThat(response, containsString("Connection: close"));
}
@ -166,7 +168,8 @@ public class ServletTest
}
}, "/get");
_connector.setIdleTimeout(250);
long idleTimeout = 1000;
_connector.setIdleTimeout(idleTimeout);
_server.start();
try (LocalConnector.LocalEndPoint endPoint = _connector.connect())
@ -177,15 +180,15 @@ public class ServletTest
""";
endPoint.addInput(request);
String response = endPoint.getResponse();
String response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
endPoint.addInput(request);
response = endPoint.getResponse();
response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
Thread.sleep(500);
Thread.sleep(2 * idleTimeout);
assertFalse(endPoint.isOpen());
}

View File

@ -92,7 +92,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@Disabled
public class AsyncIOServletTest extends AbstractTest
{
private static final ThreadLocal<RuntimeException> scope = new ThreadLocal<>();
@ -1081,6 +1080,7 @@ public class AsyncIOServletTest extends AbstractTest
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Disabled // TODO Cannot write response from onError as failure has occurred
public void testAsyncReadEarlyEOF(Transport transport) throws Exception
{
// SSLEngine receives the close alert from the client, and when
@ -1197,8 +1197,8 @@ public class AsyncIOServletTest extends AbstractTest
}
@ParameterizedTest
@MethodSource("transportsNoFCGI")
public void testAsyncEcho(Transport transport) throws Exception
@MethodSource("transports")
public void testAsyncReadEcho(Transport transport) throws Exception
{
// TODO: investigate why H3 does not work.
Assumptions.assumeTrue(transport != Transport.H3);
@ -1208,8 +1208,6 @@ public class AsyncIOServletTest extends AbstractTest
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
System.err.println("service " + request);
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
@ -1222,7 +1220,6 @@ public class AsyncIOServletTest extends AbstractTest
int b = input.read();
if (b >= 0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
response.getOutputStream().write(b);
}
else

View File

@ -60,7 +60,7 @@
<jakarta.xml.jaxws.impl.version>4.0.0</jakarta.xml.jaxws.impl.version>
<jakarta.xml.ws.api.version>4.0.0</jakarta.xml.ws.api.version>
<jsp.impl.version>10.1.7</jsp.impl.version>
<jsp.impl.version>10.1.16</jsp.impl.version>
<mail.impl.version>2.0.1</mail.impl.version>
<sonar.skip>true</sonar.skip>

View File

@ -49,7 +49,7 @@
<jakarta.websocket.api.version>1.1.2</jakarta.websocket.api.version>
<javax.mail.glassfish.version>1.4.1.v201005082020</javax.mail.glassfish.version>
<jetty.servlet.api.version>4.0.6</jetty.servlet.api.version>
<jsp.impl.version>9.0.52</jsp.impl.version>
<jsp.impl.version>9.0.83.1</jsp.impl.version>
<modify-sources-plugin.version>1.0.9</modify-sources-plugin.version>
<sonar.skip>true</sonar.skip>
<weld.version>3.1.9.Final</weld.version>

View File

@ -146,6 +146,7 @@ public class HttpChannelState
private long _timeoutMs = DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
private Thread _onTimeoutThread;
private boolean _failureListener;
protected HttpChannelState(HttpChannel channel)
{
@ -530,6 +531,11 @@ public class HttpChannelState
if (_state != State.HANDLING || _requestState != RequestState.BLOCKING)
throw new IllegalStateException(this.getStatusStringLocked());
if (!_failureListener)
{
_failureListener = true;
getHttpChannel().getCoreRequest().addFailureListener(this::asyncError);
}
_requestState = RequestState.ASYNC;
_event = event;
lastAsyncListeners = _asyncListeners;
@ -1066,6 +1072,7 @@ public class HttpChannelState
_asyncWritePossible = false;
_timeoutMs = DEFAULT_TIMEOUT;
_event = null;
_failureListener = false;
}
}

View File

@ -83,6 +83,7 @@ public class ServletTest
@Test
public void testSimpleIdleIgnored() throws Exception
{
long idleTimeout = 1000;
_context.addServlet(new ServletHolder(new HttpServlet()
{
@Override
@ -90,7 +91,7 @@ public class ServletTest
{
try
{
Thread.sleep(1000);
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
@ -100,13 +101,13 @@ public class ServletTest
}
}), "/get");
_connector.setIdleTimeout(250);
_connector.setIdleTimeout(idleTimeout);
_server.start();
String response = _connector.getResponse("""
GET /ctx/get HTTP/1.0
""", 5, TimeUnit.SECONDS);
""", 5 * idleTimeout, TimeUnit.MILLISECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
}
@ -114,6 +115,7 @@ public class ServletTest
@Test
public void testSimpleIdleRead() throws Exception
{
long idleTimeout = 1000;
_context.addServlet(new ServletHolder(new HttpServlet()
{
@Override
@ -124,7 +126,7 @@ public class ServletTest
}
}), "/post");
_connector.setIdleTimeout(250);
_connector.setIdleTimeout(idleTimeout);
_server.start();
try (LocalConnector.LocalEndPoint endPoint = _connector.connect())
@ -148,7 +150,7 @@ public class ServletTest
assertThat(response, containsString("Hello 1234567890"));
endPoint.addInputAndExecute(request);
response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
response = endPoint.getResponse(false, 2 * idleTimeout, TimeUnit.MILLISECONDS);
assertThat(response, containsString(" 500 "));
assertThat(response, containsString("Connection: close"));
}
@ -166,7 +168,8 @@ public class ServletTest
}
}), "/get");
_connector.setIdleTimeout(250);
long idleTimeout = 1000;
_connector.setIdleTimeout(idleTimeout);
_server.start();
try (LocalConnector.LocalEndPoint endPoint = _connector.connect())
@ -177,15 +180,15 @@ public class ServletTest
""";
endPoint.addInput(request);
String response = endPoint.getResponse();
String response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
endPoint.addInput(request);
response = endPoint.getResponse();
response = endPoint.getResponse(false, 5, TimeUnit.SECONDS);
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Hello!"));
Thread.sleep(500);
Thread.sleep(2 * idleTimeout);
assertFalse(endPoint.isOpen());
}

View File

@ -94,7 +94,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@Disabled
public class AsyncIOServletTest extends AbstractTest
{
private static final ThreadLocal<RuntimeException> scope = new ThreadLocal<>();
@ -1083,6 +1082,7 @@ public class AsyncIOServletTest extends AbstractTest
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Disabled // TODO Cannot write response from onError as failure has occurred
public void testAsyncReadEarlyEOF(Transport transport) throws Exception
{
// SSLEngine receives the close alert from the client, and when
@ -1200,6 +1200,7 @@ public class AsyncIOServletTest extends AbstractTest
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Disabled // TODO
public void testAsyncIntercepted(Transport transport) throws Exception
{
start(transport, new HttpServlet()
@ -1550,6 +1551,7 @@ public class AsyncIOServletTest extends AbstractTest
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Disabled // TODO
public void testAsyncInterceptedTwiceWithNulls(Transport transport) throws Exception
{
start(transport, new HttpServlet()

View File

@ -69,7 +69,7 @@
<!-- FIXME we need a separate property for this one -->
<jetty.servlet.api.version>5.0.2</jetty.servlet.api.version>
<jsp.impl.version>10.0.14</jsp.impl.version>
<jsp.impl.version>10.0.27</jsp.impl.version>
<sonar.skip>true</sonar.skip>
<weld.version>4.0.3.Final</weld.version>