Issue #6728 - Prepare for QUIC and HTTP/3 (#7100)

* Issue #6728 - QUIC and HTTP/3

Modifications to Jetty to prepare for QUIC/HTTP3.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-12-01 09:39:51 +01:00 committed by GitHub
parent 09f4e3c6a2
commit cb25a83667
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 915 additions and 593 deletions

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
@ -501,9 +502,9 @@ public class NetworkTrafficListenerTest
super(new HttpClientTransportOverHTTP(new ClientConnector()
{
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey)
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
return new NetworkTrafficSocketChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout().toMillis(), listener.get());
return new NetworkTrafficSocketChannelEndPoint((SocketChannel)channel, selector, selectionKey, getScheduler(), getIdleTimeout().toMillis(), listener.get());
}
}));
this.listener = listener;

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@ -26,6 +27,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Partial implementation of EndPoint that uses {@link FillInterest} and {@link WriteFlusher}.</p>
*/
public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractEndPoint.class);
@ -33,7 +37,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
private final long _created = System.currentTimeMillis();
private volatile Connection _connection;
private final FillInterest _fillInterest = new FillInterest()
{
@Override
@ -42,7 +45,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
AbstractEndPoint.this.needsFillInterest();
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
@ -299,41 +301,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
@Override
public boolean isOpen()
{
switch (_state.get())
{
case CLOSED:
return false;
default:
return true;
}
}
public void checkFlush() throws IOException
{
State s = _state.get();
switch (s)
{
case OSHUT:
case OSHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
}
public void checkFill() throws IOException
{
State s = _state.get();
switch (s)
{
case ISHUT:
case ISHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
return _state.get() != State.CLOSED;
}
@Override
@ -413,7 +381,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
_writeFlusher.write(callback, buffers);
}
@ -486,22 +454,12 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
@Override
public String toString()
{
return String.format("%s->%s", toEndPointString(), toConnectionString());
return String.format("%s@%x[%s]->[%s]", getClass().getSimpleName(), hashCode(), toEndPointString(), toConnectionString());
}
public String toEndPointString()
{
Class<?> c = getClass();
String name = c.getSimpleName();
while (name.length() == 0 && c.getSuperclass() != null)
{
c = c.getSuperclass();
name = c.getSimpleName();
}
return String.format("%s@%h{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}",
name,
this,
return String.format("{l=%s,r=%s,%s,fill=%s,flush=%s,to=%d/%d}",
getLocalSocketAddress(),
getRemoteSocketAddress(),
_state.get(),

View File

@ -21,14 +21,15 @@ import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.IO;
@ -76,6 +77,7 @@ public class ClientConnector extends ContainerLifeCycle
public static final String REMOTE_SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".remoteSocketAddress";
public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory";
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
public static final String APPLICATION_PROTOCOLS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".applicationProtocols";
private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class);
/**
@ -86,10 +88,10 @@ public class ClientConnector extends ContainerLifeCycle
*/
public static ClientConnector forUnixDomain(Path path)
{
return new ClientConnector(SocketChannelWithAddress.Factory.forUnixDomain(path));
return new ClientConnector(Configurator.forUnixDomain(path));
}
private final SocketChannelWithAddress.Factory factory;
private final Configurator configurator;
private Executor executor;
private Scheduler scheduler;
private ByteBufferPool byteBufferPool;
@ -108,12 +110,23 @@ public class ClientConnector extends ContainerLifeCycle
public ClientConnector()
{
this((address, context) -> new SocketChannelWithAddress(SocketChannel.open(), address));
this(new Configurator());
}
private ClientConnector(SocketChannelWithAddress.Factory factory)
public ClientConnector(Configurator configurator)
{
this.factory = Objects.requireNonNull(factory);
this.configurator = Objects.requireNonNull(configurator);
addBean(configurator);
}
/**
* @param address the SocketAddress to connect to
* @return whether the connection to the given SocketAddress is intrinsically secure
* @see Configurator#isIntrinsicallySecure(ClientConnector, SocketAddress)
*/
public boolean isIntrinsicallySecure(SocketAddress address)
{
return configurator.isIntrinsicallySecure(this, address);
}
public Executor getExecutor()
@ -384,37 +397,45 @@ public class ClientConnector extends ContainerLifeCycle
public void connect(SocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
SelectableChannel channel = null;
try
{
if (context == null)
context = new HashMap<>();
context = new ConcurrentHashMap<>();
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
context.putIfAbsent(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY, address);
SocketChannelWithAddress channelWithAddress = factory.newSocketChannelWithAddress(address, context);
channel = channelWithAddress.getSocketChannel();
Configurator.ChannelWithAddress channelWithAddress = configurator.newChannelWithAddress(this, address, context);
channel = channelWithAddress.getSelectableChannel();
address = channelWithAddress.getSocketAddress();
configure(channel);
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
bind(channel, bindAddress);
if (bindAddress != null && channel instanceof NetworkChannel)
bind((NetworkChannel)channel, bindAddress);
boolean connected = true;
boolean blocking = isConnectBlocking() && address instanceof InetSocketAddress;
if (LOG.isDebugEnabled())
LOG.debug("Connecting {} to {}", blocking ? "blocking" : "non-blocking", address);
if (blocking)
if (channel instanceof SocketChannel)
{
channel.socket().connect(address, (int)getConnectTimeout().toMillis());
channel.configureBlocking(false);
SocketChannel socketChannel = (SocketChannel)channel;
boolean blocking = isConnectBlocking() && address instanceof InetSocketAddress;
if (LOG.isDebugEnabled())
LOG.debug("Connecting {} to {}", blocking ? "blocking" : "non-blocking", address);
if (blocking)
{
socketChannel.socket().connect(address, (int)getConnectTimeout().toMillis());
socketChannel.configureBlocking(false);
}
else
{
socketChannel.configureBlocking(false);
connected = socketChannel.connect(address);
}
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
@ -435,10 +456,11 @@ public class ClientConnector extends ContainerLifeCycle
}
}
public void accept(SocketChannel channel, Map<String, Object> context)
public void accept(SelectableChannel selectable, Map<String, Object> context)
{
try
{
SocketChannel channel = (SocketChannel)selectable;
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
@ -449,35 +471,39 @@ public class ClientConnector extends ContainerLifeCycle
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not accept {}", channel);
IO.close(channel);
LOG.debug("Could not accept {}", selectable);
IO.close(selectable);
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.failed(failure);
}
}
private void bind(SocketChannel channel, SocketAddress bindAddress) throws IOException
private void bind(NetworkChannel channel, SocketAddress bindAddress) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("Binding {} to {}", channel, bindAddress);
channel.bind(bindAddress);
}
protected void configure(SocketChannel channel) throws IOException
protected void configure(SelectableChannel selectable) throws IOException
{
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, isTCPNoDelay());
setSocketOption(channel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(channel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
int receiveBufferSize = getReceiveBufferSize();
if (receiveBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
int sendBufferSize = getSendBufferSize();
if (sendBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, sendBufferSize);
if (selectable instanceof NetworkChannel)
{
NetworkChannel channel = (NetworkChannel)selectable;
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, isTCPNoDelay());
setSocketOption(channel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(channel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
int receiveBufferSize = getReceiveBufferSize();
if (receiveBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
int sendBufferSize = getSendBufferSize();
if (sendBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, sendBufferSize);
}
}
private <T> void setSocketOption(SocketChannel channel, SocketOption<T> option, T value)
private <T> void setSocketOption(NetworkChannel channel, SocketOption<T> option, T value)
{
try
{
@ -485,14 +511,23 @@ public class ClientConnector extends ContainerLifeCycle
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not configure {} to {} on {}", option, value, channel, x);
if (LOG.isTraceEnabled())
LOG.trace("Could not configure {} to {} on {}", option, value, channel, x);
}
}
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey)
protected EndPoint newEndPoint(SelectableChannel selectable, ManagedSelector selector, SelectionKey selectionKey)
{
return new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)selectionKey.attachment();
SocketAddress address = (SocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
return configurator.newEndPoint(this, address, selectable, selector, selectionKey);
}
protected Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
SocketAddress address = (SocketAddress)context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
return configurator.newConnection(this, address, endPoint, context);
}
protected void connectFailed(Throwable failure, Map<String, Object> context)
@ -514,7 +549,7 @@ public class ClientConnector extends ContainerLifeCycle
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
EndPoint endPoint = ClientConnector.this.newEndPoint((SocketChannel)channel, selector, selectionKey);
EndPoint endPoint = ClientConnector.this.newEndPoint(channel, selector, selectionKey);
endPoint.setIdleTimeout(getIdleTimeout().toMillis());
return endPoint;
}
@ -524,14 +559,15 @@ public class ClientConnector extends ContainerLifeCycle
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
ClientConnectionFactory factory = (ClientConnectionFactory)context.get(CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return factory.newConnection(endPoint, context);
return ClientConnector.this.newConnection(endPoint, context);
}
@Override
public void connectionOpened(Connection connection, Object context)
{
super.connectionOpened(connection, context);
// TODO: the block below should be moved to Connection.onOpen() in each implementation,
// so that each implementation can decide when to notify the promise, possibly not in onOpen().
@SuppressWarnings("unchecked")
Map<String, Object> contextMap = (Map<String, Object>)context;
@SuppressWarnings("unchecked")
@ -550,37 +586,96 @@ public class ClientConnector extends ContainerLifeCycle
}
/**
* <p>A pair/record holding a {@link SocketChannel} and a {@link SocketAddress} to connect to.</p>
* <p>Configures a {@link ClientConnector}.</p>
*/
private static class SocketChannelWithAddress
public static class Configurator
{
private final SocketChannel channel;
private final SocketAddress address;
private SocketChannelWithAddress(SocketChannel channel, SocketAddress address)
/**
* <p>Returns whether the connection to a given {@link SocketAddress} is intrinsically secure.</p>
* <p>A protocol such as HTTP/1.1 can be transported by TCP; however, TCP is not secure because
* it does not offer any encryption.</p>
* <p>Encryption is provided by using TLS to wrap the HTTP/1.1 bytes, and then transporting the
* TLS bytes over TCP.</p>
* <p>On the other hand, protocols such as QUIC are intrinsically secure, and therefore it is
* not necessary to wrap the HTTP/1.1 bytes with TLS: the HTTP/1.1 bytes are transported over
* QUIC in an intrinsically secure way.</p>
*
* @param clientConnector the ClientConnector
* @param address the SocketAddress to connect to
* @return whether the connection to the given SocketAddress is intrinsically secure
*/
public boolean isIntrinsicallySecure(ClientConnector clientConnector, SocketAddress address)
{
this.channel = channel;
this.address = address;
}
private SocketChannel getSocketChannel()
{
return channel;
}
private SocketAddress getSocketAddress()
{
return address;
return false;
}
/**
* <p>A factory for {@link SocketChannelWithAddress} instances.</p>
* <p>Creates a new {@link SocketChannel} to connect to a {@link SocketAddress}
* derived from the input socket address.</p>
* <p>The input socket address represents the destination socket address to
* connect to, as it is typically specified by a URI authority, for example
* {@code localhost:8080} if the URI is {@code http://localhost:8080/path}.</p>
* <p>However, the returned socket address may be different as the implementation
* may use a Unix-Domain socket address to physically connect to the virtual
* destination socket address given as input.</p>
* <p>The return type is a pair/record holding the socket channel and the
* socket address, with the socket channel not yet connected.
* The implementation of this methods must not call
* {@link SocketChannel#connect(SocketAddress)}, as this is done later,
* after configuring the socket, by the {@link ClientConnector} implementation.</p>
*
* @param address the destination socket address, typically specified in a URI
* @param context the context to create the new socket channel
* @return a new {@link SocketChannel} with an associated {@link SocketAddress} to connect to
* @throws IOException if the socket channel or the socket address cannot be created
*/
private interface Factory
public ChannelWithAddress newChannelWithAddress(ClientConnector clientConnector, SocketAddress address, Map<String, Object> context) throws IOException
{
private static Factory forUnixDomain(Path path)
return new ChannelWithAddress(SocketChannel.open(), address);
}
public EndPoint newEndPoint(ClientConnector clientConnector, SocketAddress address, SelectableChannel selectable, ManagedSelector selector, SelectionKey selectionKey)
{
return new SocketChannelEndPoint((SocketChannel)selectable, selector, selectionKey, clientConnector.getScheduler());
}
public Connection newConnection(ClientConnector clientConnector, SocketAddress address, EndPoint endPoint, Map<String, Object> context) throws IOException
{
ClientConnectionFactory factory = (ClientConnectionFactory)context.get(CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return factory.newConnection(endPoint, context);
}
/**
* <p>A pair/record holding a {@link SelectableChannel} and a {@link SocketAddress} to connect to.</p>
*/
public static class ChannelWithAddress
{
private final SelectableChannel channel;
private final SocketAddress address;
public ChannelWithAddress(SelectableChannel channel, SocketAddress address)
{
return (address, context) ->
this.channel = channel;
this.address = address;
}
public SelectableChannel getSelectableChannel()
{
return channel;
}
public SocketAddress getSocketAddress()
{
return address;
}
}
private static Configurator forUnixDomain(Path path)
{
return new Configurator()
{
@Override
public ChannelWithAddress newChannelWithAddress(ClientConnector clientConnector, SocketAddress address, Map<String, Object> context)
{
try
{
@ -588,37 +683,15 @@ public class ClientConnector extends ContainerLifeCycle
SocketChannel socketChannel = (SocketChannel)SocketChannel.class.getMethod("open", ProtocolFamily.class).invoke(null, family);
Class<?> addressClass = Class.forName("java.net.UnixDomainSocketAddress");
SocketAddress socketAddress = (SocketAddress)addressClass.getMethod("of", Path.class).invoke(null, path);
return new SocketChannelWithAddress(socketChannel, socketAddress);
return new ChannelWithAddress(socketChannel, socketAddress);
}
catch (Throwable x)
{
String message = "Unix-Domain SocketChannels are available starting from Java 16, your Java version is: " + JavaVersion.VERSION;
throw new UnsupportedOperationException(message, x);
}
};
}
/**
* <p>Creates a new {@link SocketChannel} to connect to a {@link SocketAddress}
* derived from the input socket address.</p>
* <p>The input socket address represents the destination socket address to
* connect to, as it is typically specified by a URI authority, for example
* {@code localhost:8080} if the URI is {@code http://localhost:8080/path}.</p>
* <p>However, the returned socket address may be different as the implementation
* may use a Unix-Domain socket address to physically connect to the virtual
* destination socket address given as input.</p>
* <p>The return type is a pair/record holding the socket channel and the
* socket address, with the socket channel not yet connected.
* The implementation of this methods must not call
* {@link SocketChannel#connect(SocketAddress)}, as this is done later,
* after configuring the socket, by the {@link ClientConnector} implementation.</p>
*
* @param address the destination socket address, typically specified in a URI
* @param context the context to create the new socket channel
* @return a new {@link SocketChannel} with an associated {@link SocketAddress} to connect to
* @throws IOException if the socket channel or the socket address cannot be created
*/
public SocketChannelWithAddress newSocketChannelWithAddress(SocketAddress address, Map<String, Object> context) throws IOException;
}
};
}
}
}

View File

@ -0,0 +1,132 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>An {@link EndPoint} implementation based on {@link DatagramChannel}.</p>
*/
public class DatagramChannelEndPoint extends SelectableChannelEndPoint
{
public static final SocketAddress EOF = InetSocketAddress.createUnresolved("", 0);
private static final Logger LOG = LoggerFactory.getLogger(DatagramChannelEndPoint.class);
public DatagramChannelEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler, channel, selector, key);
}
@Override
public DatagramChannel getChannel()
{
return (DatagramChannel)super.getChannel();
}
/**
* <p>Receives data into the given buffer from the returned address.</p>
* <p>This method should be used to receive UDP data.</p>
*
* @param buffer the buffer to fill with data
* @return the peer address that sent the data
* @throws IOException if the receive fails
*/
public SocketAddress receive(ByteBuffer buffer) throws IOException
{
if (isInputShutdown())
return EOF;
int pos = BufferUtil.flipToFill(buffer);
SocketAddress peer = getChannel().receive(buffer);
BufferUtil.flipToFlush(buffer, pos);
if (peer == null)
return null;
notIdle();
int filled = buffer.remaining();
if (LOG.isDebugEnabled())
LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer));
return peer;
}
/**
* <p>Sends to the given address the data in the given buffers.</p>
* <p>This methods should be used to send UDP data.</p>
*
* @param address the peer address to send data to
* @param buffers the buffers containing the data to send
* @return true if all the buffers have been consumed
* @throws IOException if the send fails
* @see #write(Callback, SocketAddress, ByteBuffer...)
*/
public boolean send(SocketAddress address, ByteBuffer... buffers) throws IOException
{
boolean flushedAll = true;
long flushed = 0;
try
{
if (LOG.isDebugEnabled())
LOG.debug("flushing {} buffer(s) to {}", buffers.length, address);
for (ByteBuffer buffer : buffers)
{
int sent = getChannel().send(buffer, address);
if (sent == 0)
{
flushedAll = false;
break;
}
flushed += sent;
}
if (LOG.isDebugEnabled())
LOG.debug("flushed {} byte(s), all flushed? {} - {}", flushed, flushedAll, this);
}
catch (IOException e)
{
throw new EofException(e);
}
if (flushed > 0)
notIdle();
return flushedAll;
}
/**
* <p>Writes to the given address the data contained in the given buffers, and invokes
* the given callback when either all the data has been sent, or a failure occurs.</p>
*
* @param callback the callback to notify of the success or failure of the write operation
* @param address the peer address to send data to
* @param buffers the buffers containing the data to send
* @throws WritePendingException if a previous write was initiated but was not yet completed
* @see #send(SocketAddress, ByteBuffer...)
*/
public void write(Callback callback, SocketAddress address, ByteBuffer... buffers) throws WritePendingException
{
getWriteFlusher().write(callback, address, buffers);
}
}

View File

@ -203,7 +203,10 @@ public interface EndPoint extends Closeable
* filled or -1 if EOF is read or the input is shutdown.
* @throws IOException if the endpoint is closed.
*/
int fill(ByteBuffer buffer) throws IOException;
default int fill(ByteBuffer buffer) throws IOException
{
throw new UnsupportedOperationException();
}
/**
* Flush data from the passed header/buffer to this endpoint. As many bytes as can be consumed
@ -215,7 +218,10 @@ public interface EndPoint extends Closeable
* destination (ie is not buffering any data).
* @throws IOException If the endpoint is closed or output is shutdown.
*/
boolean flush(ByteBuffer... buffer) throws IOException;
default boolean flush(ByteBuffer... buffer) throws IOException
{
throw new UnsupportedOperationException();
}
/**
* @return The underlying transport object (socket, channel, etc.)
@ -271,7 +277,10 @@ public interface EndPoint extends Closeable
* @param buffers one or more {@link ByteBuffer}s that will be flushed.
* @throws WritePendingException if another write operation is concurrent.
*/
void write(Callback callback, ByteBuffer... buffers) throws WritePendingException;
default void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
throw new UnsupportedOperationException();
}
/**
* @return the {@link Connection} associated with this EndPoint

View File

@ -990,36 +990,42 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public void update(Selector selector)
{
if (LOG.isDebugEnabled())
LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this);
for (SelectionKey key : selector.keys())
try
{
if (key != null && key.isValid())
if (LOG.isDebugEnabled())
LOG.debug("Closing {} connections on {}", selector.keys().size(), ManagedSelector.this);
for (SelectionKey key : selector.keys())
{
Closeable closeable = null;
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
if (key != null && key.isValid())
{
EndPoint endPoint = (EndPoint)attachment;
Connection connection = endPoint.getConnection();
closeable = Objects.requireNonNullElse(connection, endPoint);
}
if (closeable != null)
{
if (_closed == null)
Closeable closeable = null;
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
{
IO.close(closeable);
EndPoint endPoint = (EndPoint)attachment;
Connection connection = endPoint.getConnection();
closeable = Objects.requireNonNullElse(connection, endPoint);
}
else if (!_closed.contains(closeable))
if (closeable != null)
{
_closed.add(closeable);
IO.close(closeable);
if (_closed == null)
{
IO.close(closeable);
}
else if (!_closed.contains(closeable))
{
_closed.add(closeable);
IO.close(closeable);
}
}
}
}
}
_complete.countDown();
finally
{
_complete.countDown();
}
}
}
@ -1030,18 +1036,24 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public void update(Selector selector)
{
for (SelectionKey key : selector.keys())
try
{
// Key may be null when using the UnixSocket selector.
if (key == null)
continue;
Object attachment = key.attachment();
if (attachment instanceof Closeable)
IO.close((Closeable)attachment);
for (SelectionKey key : selector.keys())
{
// Key may be null when using the UnixSocket selector.
if (key == null)
continue;
Object attachment = key.attachment();
if (attachment instanceof Closeable)
IO.close((Closeable)attachment);
}
_selector = null;
IO.close(selector);
}
finally
{
_stopped.countDown();
}
_selector = null;
IO.close(selector);
_stopped.countDown();
}
}

View File

@ -0,0 +1,337 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A partial {@link EndPoint} implementation based on {@link SelectableChannel}.</p>
*/
public abstract class SelectableChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(SelectableChannelEndPoint.class);
private final AutoLock _lock = new AutoLock();
private final SelectableChannel _channel;
private final ManagedSelector _selector;
private SelectionKey _key;
private boolean _updatePending;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@Override
public void run()
{
getFillInterest().fillable();
}
@Override
public InvocationType getInvocationType()
{
return getFillInterest().getCallbackInvocationType();
}
};
private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
@Override
public InvocationType getInvocationType()
{
return getWriteFlusher().getCallbackInvocationType();
}
@Override
public String toString()
{
return String.format("%s:%s:%s->%s", SelectableChannelEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
}
};
private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{
@Override
public void run()
{
getWriteFlusher().completeWrite();
getFillInterest().fillable();
}
@Override
public InvocationType getInvocationType()
{
InvocationType fillT = getFillInterest().getCallbackInvocationType();
InvocationType flushT = getWriteFlusher().getCallbackInvocationType();
if (fillT == flushT)
return fillT;
if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING)
return InvocationType.EITHER;
if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER)
return InvocationType.EITHER;
return InvocationType.BLOCKING;
}
};
public SelectableChannelEndPoint(Scheduler scheduler, SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
super(scheduler);
_channel = channel;
_selector = selector;
_key = selectionKey;
}
public SelectableChannel getChannel()
{
return _channel;
}
@Override
public Object getTransport()
{
return getChannel();
}
@Override
public SocketAddress getLocalSocketAddress()
{
try
{
SelectableChannel channel = getChannel();
if (channel instanceof NetworkChannel)
return ((NetworkChannel)channel).getLocalAddress();
return super.getLocalSocketAddress();
}
catch (Throwable x)
{
LOG.trace("Could not retrieve local socket address", x);
return null;
}
}
@Override
public boolean isOpen()
{
return _channel.isOpen();
}
@Override
public void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("doClose {}", this);
IO.close(_channel);
super.doClose();
}
@Override
public void onClose(Throwable cause)
{
try
{
super.onClose(cause);
}
finally
{
if (_selector != null)
_selector.destroyEndPoint(this, cause);
}
}
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
private void changeInterests(int operation)
{
// This method runs from any thread, possibly
// concurrently with updateKey() and onSelected().
int oldInterestOps;
int newInterestOps;
boolean pending;
try (AutoLock l = _lock.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector != null)
_selector.submit(_updateKeyAction);
}
@Override
public Runnable onSelected()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override
public void updateKey()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
try
{
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for {}", this, x);
close();
}
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override
public String toEndPointString()
{
// We do a best effort to print the right toString() and that's it.
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toEndPointString(),
_currentInterestOps,
_desiredInterestOps,
ManagedSelector.safeInterestOps(_key),
ManagedSelector.safeReadyOps(_key));
}
private abstract class RunnableCloseable implements Invocable.Task, Closeable
{
final String _operation;
private RunnableCloseable(String operation)
{
_operation = operation;
}
@Override
public void close()
{
try
{
SelectableChannelEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", SelectableChannelEndPoint.this, x);
}
}
@Override
public String toString()
{
return String.format("%s:%s:%s", SelectableChannelEndPoint.this, _operation, getInvocationType());
}
}
}

View File

@ -13,164 +13,33 @@
package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Channel End Point.
* <p>Holds the channel and socket for an NIO endpoint.
* <p>An {@link EndPoint} implementation based on {@link SocketChannel}.</p>
*/
public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
public class SocketChannelEndPoint extends SelectableChannelEndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(SocketChannelEndPoint.class);
private final AutoLock _lock = new AutoLock();
private final SocketChannel _channel;
private final ManagedSelector _selector;
private SelectionKey _key;
private boolean _updatePending;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable
{
final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("%s:%s:%s", SocketChannelEndPoint.this, _operation, getInvocationType());
}
}
private abstract class RunnableCloseable extends RunnableTask implements Closeable
{
protected RunnableCloseable(String op)
{
super(op);
}
@Override
public void close()
{
try
{
SocketChannelEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", SocketChannelEndPoint.this, x);
}
}
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@Override
public InvocationType getInvocationType()
{
return getFillInterest().getCallbackInvocationType();
}
@Override
public void run()
{
getFillInterest().fillable();
}
};
private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{
@Override
public InvocationType getInvocationType()
{
return getWriteFlusher().getCallbackInvocationType();
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
@Override
public String toString()
{
return String.format("%s:%s:%s->%s", SocketChannelEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
}
};
private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{
@Override
public InvocationType getInvocationType()
{
InvocationType fillT = getFillInterest().getCallbackInvocationType();
InvocationType flushT = getWriteFlusher().getCallbackInvocationType();
if (fillT == flushT)
return fillT;
if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING)
return InvocationType.EITHER;
if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER)
return InvocationType.EITHER;
return InvocationType.BLOCKING;
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
getFillInterest().fillable();
}
};
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel = channel;
_selector = selector;
_key = key;
super(scheduler, channel, selector, key);
}
@Override
public SocketAddress getLocalSocketAddress()
public SocketChannel getChannel()
{
try
{
return _channel.getLocalAddress();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not retrieve local socket address", x);
return null;
}
return (SocketChannel)super.getChannel();
}
@Override
@ -178,68 +47,26 @@ public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSe
{
try
{
return _channel.getRemoteAddress();
return getChannel().getRemoteAddress();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not retrieve remote socket address", x);
LOG.trace("Could not retrieve remote socket address", x);
return null;
}
}
@Override
public boolean isOpen()
{
return _channel.isOpen();
}
@Override
protected void doShutdownOutput()
{
try
{
Socket socket = _channel.socket();
if (!socket.isOutputShutdown())
socket.shutdownOutput();
getChannel().shutdownOutput();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not shutdown output for {}", _channel, x);
}
}
@Override
public void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("doClose {}", this);
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug("Unable to close channel", e);
}
finally
{
super.doClose();
}
}
@Override
public void onClose(Throwable cause)
{
try
{
super.onClose(cause);
}
finally
{
if (_selector != null)
_selector.destroyEndPoint(this, cause);
LOG.debug("Could not shutdown output for {}", getChannel(), x);
}
}
@ -253,7 +80,7 @@ public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSe
int filled;
try
{
filled = _channel.read(buffer);
filled = getChannel().read(buffer);
if (filled > 0)
notIdle();
else if (filled == -1)
@ -280,7 +107,7 @@ public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSe
long flushed;
try
{
flushed = _channel.write(buffers);
flushed = getChannel().write(buffers);
if (LOG.isDebugEnabled())
LOG.debug("flushed {} {}", flushed, this);
}
@ -300,150 +127,4 @@ public class SocketChannelEndPoint extends AbstractEndPoint implements ManagedSe
return true;
}
public SocketChannel getChannel()
{
return _channel;
}
@Override
public Object getTransport()
{
return _channel;
}
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
@Override
public Runnable onSelected()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override
public void updateKey()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
try
{
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for {}", this, x);
close();
}
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
private void changeInterests(int operation)
{
// This method runs from any thread, possibly
// concurrently with updateKey() and onSelected().
int oldInterestOps;
int newInterestOps;
boolean pending;
try (AutoLock l = _lock.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector != null)
_selector.submit(_updateKeyAction);
}
@Override
public String toEndPointString()
{
// We do a best effort to print the right toString() and that's it.
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toEndPointString(),
_currentInterestOps,
_desiredInterestOps,
ManagedSelector.safeInterestOps(_key),
ManagedSelector.safeReadyOps(_key));
}
}

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritePendingException;
@ -204,18 +205,15 @@ public abstract class WriteFlusher
private class PendingState extends State
{
private final Callback _callback;
private final SocketAddress _address;
private final ByteBuffer[] _buffers;
private PendingState(ByteBuffer[] buffers, Callback callback)
private PendingState(Callback callback, SocketAddress address, ByteBuffer[] buffers)
{
super(StateType.PENDING);
_buffers = buffers;
_callback = callback;
}
public ByteBuffer[] getBuffers()
{
return _buffers;
_address = address;
_buffers = buffers;
}
InvocationType getCallbackInvocationType()
@ -252,6 +250,11 @@ public abstract class WriteFlusher
* @throws WritePendingException if unable to write due to prior pending write
*/
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
write(callback, null, buffers);
}
public void write(Callback callback, SocketAddress address, ByteBuffer... buffers) throws WritePendingException
{
Objects.requireNonNull(callback);
@ -269,13 +272,13 @@ public abstract class WriteFlusher
try
{
buffers = flush(buffers);
buffers = flush(address, buffers);
if (buffers != null)
{
if (DEBUG)
LOG.debug("flushed incomplete");
PendingState pending = new PendingState(buffers, callback);
PendingState pending = new PendingState(callback, address, buffers);
if (updateState(__WRITING, pending))
onIncompleteFlush();
else
@ -368,16 +371,17 @@ public abstract class WriteFlusher
Callback callback = pending._callback;
try
{
ByteBuffer[] buffers = pending.getBuffers();
ByteBuffer[] buffers = pending._buffers;
SocketAddress address = pending._address;
buffers = flush(buffers);
buffers = flush(address, buffers);
if (buffers != null)
{
if (DEBUG)
LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers));
if (buffers != pending.getBuffers())
pending = new PendingState(buffers, callback);
if (buffers != pending._buffers)
pending = new PendingState(callback, address, buffers);
if (updateState(__COMPLETING, pending))
onIncompleteFlush();
else
@ -408,13 +412,13 @@ public abstract class WriteFlusher
* @return The unflushed buffers, or null if all flushed
* @throws IOException if unable to flush
*/
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
protected ByteBuffer[] flush(SocketAddress address, ByteBuffer[] buffers) throws IOException
{
boolean progress = true;
while (progress && buffers != null)
{
long before = BufferUtil.remaining(buffers);
boolean flushed = _endPoint.flush(buffers);
boolean flushed = address == null ? _endPoint.flush(buffers) : ((DatagramChannelEndPoint)_endPoint).send(address, buffers);
long after = BufferUtil.remaining(buffers);
long written = before - after;

View File

@ -124,23 +124,6 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
private FlushState _flushState = FlushState.IDLE;
private FillState _fillState = FillState.IDLE;
private boolean _underflown;
private abstract class RunnableTask implements Runnable, Invocable
{
private final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("SSL:%s:%s:%s", SslConnection.this, _operation, getInvocationType());
}
}
private final Runnable _runFillable = new RunnableTask("runFillable")
{
@Override
@ -155,7 +138,6 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
return _decryptedEndPoint.getFillInterest().getCallbackInvocationType();
}
};
private final Callback _sslReadCallback = new Callback()
{
@Override
@ -1541,7 +1523,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
@Override
public String toString()
{
return super.toEndPointString();
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), toEndPointString());
}
private final class IncompleteWriteCallback implements Callback, Invocable
@ -1612,4 +1594,20 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
}
}
}
private abstract class RunnableTask implements Invocable.Task
{
private final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("SSL:%s:%s:%s", SslConnection.this, _operation, getInvocationType());
}
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.Arrays;
@ -371,13 +372,13 @@ public class WriteFlusherTest
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
protected ByteBuffer[] flush(SocketAddress address, ByteBuffer[] buffers) throws IOException
{
try
{
flushLatch.countDown();
Thread.sleep(2000);
return super.flush(buffers);
return super.flush(address, buffers);
}
catch (InterruptedException x)
{
@ -412,10 +413,9 @@ public class WriteFlusherTest
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers)
throws IOException
protected ByteBuffer[] flush(SocketAddress address, ByteBuffer[] buffers) throws IOException
{
ByteBuffer[] result = super.flush(buffers);
ByteBuffer[] result = super.flush(address, buffers);
boolean notified = onFail(new Throwable());
assertTrue(notified);
return result;

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.Channel;
@ -362,6 +361,19 @@ public class ServerConnector extends AbstractNetworkConnector
}
}
private <T> void setSocketOption(SocketChannel channel, SocketOption<T> option, T value)
{
try
{
channel.setOption(option, value);
}
catch (Throwable x)
{
if (LOG.isTraceEnabled())
LOG.trace("Could not configure {} to {} on {}", option, value, channel, x);
}
}
@Override
public void close()
{
@ -402,27 +414,14 @@ public class ServerConnector extends AbstractNetworkConnector
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, _acceptedTcpNoDelay);
if (_acceptedReceiveBufferSize > -1)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, _acceptedReceiveBufferSize);
if (_acceptedSendBufferSize > -1)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, _acceptedSendBufferSize);
_manager.accept(channel);
}
protected void configure(Socket socket)
{
try
{
socket.setTcpNoDelay(_acceptedTcpNoDelay);
if (_acceptedReceiveBufferSize > -1)
socket.setReceiveBufferSize(_acceptedReceiveBufferSize);
if (_acceptedSendBufferSize > -1)
socket.setSendBufferSize(_acceptedSendBufferSize);
}
catch (SocketException e)
{
LOG.trace("IGNORED", e);
}
}
@ManagedAttribute("The Selector Manager")
public SelectorManager getSelectorManager()
{

View File

@ -39,6 +39,24 @@ public interface Callback extends Invocable
}
};
/**
* <p>Completes this callback with the given {@link CompletableFuture}.</p>
* <p>When the CompletableFuture completes normally, this callback is succeeded;
* when the CompletableFuture completes exceptionally, this callback is failed.</p>
*
* @param completable the CompletableFuture that completes this callback
*/
default void completeWith(CompletableFuture<?> completable)
{
completable.whenComplete((o, x) ->
{
if (x == null)
succeeded();
else
failed(x);
});
}
/**
* <p>Callback invoked when the operation completes.</p>
*
@ -168,6 +186,26 @@ public interface Callback extends Invocable
};
}
/**
* <p>Creates a Callback with the given {@code invocationType},
* that runs the given {@code Runnable} when it succeeds or fails.</p>
*
* @param invocationType the invocation type of the returned Callback
* @param completed the Runnable to run when the callback either succeeds or fails
* @return a new Callback with the given invocation type
*/
static Callback from(InvocationType invocationType, Runnable completed)
{
return new Completing(invocationType)
{
@Override
public void completed()
{
completed.run();
}
};
}
/**
* Creates a nested callback that runs completed after
* completing the nested callback.
@ -283,8 +321,23 @@ public interface Callback extends Invocable
};
}
/**
* <p>A Callback implementation that calls the {@link #completed()} method when it either succeeds or fails.</p>
*/
class Completing implements Callback
{
private final InvocationType invocationType;
public Completing()
{
this(InvocationType.BLOCKING);
}
public Completing(InvocationType invocationType)
{
this.invocationType = invocationType;
}
@Override
public void succeeded()
{
@ -297,6 +350,12 @@ public interface Callback extends Invocable
completed();
}
@Override
public InvocationType getInvocationType()
{
return invocationType;
}
public void completed()
{
}

View File

@ -870,11 +870,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
return false;
T pooled = getPooled();
int maxUsageCount = getMaxUsageCount(pooled);
if (maxUsageCount > 0 && usageCount >= maxUsageCount)
return false;
int maxMultiplexed = getMaxMultiplex(pooled);
if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed)
return false;
if (maxUsageCount > 0 && usageCount >= maxUsageCount)
return false;
// Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE.
int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1;
@ -911,7 +911,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
break;
}
int currentMaxUsageCount = maxUsage;
int currentMaxUsageCount = getMaxUsageCount(getPooled());
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
return !(overUsed && newMultiplexCount == 0);
}

View File

@ -26,6 +26,17 @@ import org.slf4j.LoggerFactory;
*/
public interface Promise<C>
{
default void completeWith(CompletableFuture<C> cf)
{
cf.whenComplete((c, x) ->
{
if (x == null)
succeeded(c);
else
failed(x);
});
}
/**
* <p>Callback invoked when the operation completes.</p>
*

View File

@ -701,6 +701,16 @@ public class StringUtil
return str == null || str.isEmpty();
}
/**
* Get the length of a string where a null string is length 0.
* @param s the string.
* @return the length of the string.
*/
public static int getLength(String s)
{
return (s == null) ? 0 : s.length();
}
/**
* Test if a string is not null and contains at least 1 non-whitespace characters in it.
* <p>

View File

@ -794,6 +794,11 @@ public abstract class SslContextFactory extends AbstractLifeCycle implements Dum
_validatePeerCerts = validatePeerCerts;
}
public String getKeyStorePassword()
{
return _keyStorePassword == null ? null : _keyStorePassword.toString();
}
/**
* @param password The password for the key store. If null is passed and
* a keystore is set, then
@ -806,6 +811,11 @@ public abstract class SslContextFactory extends AbstractLifeCycle implements Dum
_keyStorePassword = password == null ? getPassword(PASSWORD_PROPERTY) : newPassword(password);
}
public String getKeyManagerPassword()
{
return _keyManagerPassword == null ? null : _keyManagerPassword.toString();
}
/**
* @param password The password (if any) for the specific key within the key store.
* If null is passed and the {@value #KEYPASSWORD_PROPERTY} system property is set,

View File

@ -31,12 +31,64 @@ import java.util.concurrent.Callable;
*/
public interface Invocable
{
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<>();
enum InvocationType
{
BLOCKING, NON_BLOCKING, EITHER
}
static ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<>();
/**
* <p>A task with an {@link InvocationType}.</p>
*/
interface Task extends Invocable, Runnable
{
}
/**
* <p>A {@link Runnable} decorated with an {@link InvocationType}.</p>
*/
class ReadyTask implements Task
{
private final InvocationType type;
private final Runnable task;
public ReadyTask(InvocationType type, Runnable task)
{
this.type = type;
this.task = task;
}
@Override
public void run()
{
task.run();
}
@Override
public InvocationType getInvocationType()
{
return type;
}
@Override
public String toString()
{
return String.format("%s@%x[%s|%s]", getClass().getSimpleName(), hashCode(), type, task);
}
}
/**
* <p>Creates a {@link Task} from the given InvocationType and Runnable.</p>
*
* @param type the InvocationType
* @param task the Runnable
* @return a new Task
*/
public static Task from(InvocationType type, Runnable task)
{
return new ReadyTask(type, task);
}
/**
* Test if the current thread has been tagged as non blocking

View File

@ -60,7 +60,7 @@ public class AdaptiveExecutionStrategyTest
{
AtomicReference<Throwable> detector = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(2);
BlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
BlockingQueue<Invocable.ReadyTask> tasks = new LinkedBlockingQueue<>();
startAES(() ->
{
boolean proceed = detector.compareAndSet(null, new Throwable());
@ -90,7 +90,7 @@ public class AdaptiveExecutionStrategyTest
// Start production in another thread.
aes.dispatch();
tasks.offer(new Task(() ->
tasks.offer(new Invocable.ReadyTask(Invocable.InvocationType.BLOCKING, () ->
{
try
{
@ -106,7 +106,7 @@ public class AdaptiveExecutionStrategyTest
{
x.printStackTrace();
}
}, Invocable.InvocationType.BLOCKING));
}));
// Wait until AES is idle.
while (!aes.isIdle())
@ -117,28 +117,4 @@ public class AdaptiveExecutionStrategyTest
assertNull(detector.get());
}
}
private static class Task implements Runnable, Invocable
{
private final Runnable task;
private final InvocationType invocationType;
private Task(Runnable task, InvocationType invocationType)
{
this.task = task;
this.invocationType = invocationType;
}
@Override
public void run()
{
task.run();
}
@Override
public InvocationType getInvocationType()
{
return invocationType;
}
}
}

View File

@ -65,7 +65,7 @@ public class TestConnection implements Producer
return handler;
}
private class Handler implements Runnable, Invocable
private class Handler implements Invocable.Task
{
private final Map<String, String> _request;
private final CompletableFuture<String> _futureResult;