move datagram endpoint to common module

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-23 12:04:23 +01:00 committed by Simone Bordet
parent ca4562ca8e
commit 669ae8ff1b
9 changed files with 47 additions and 458 deletions

View File

@ -0,0 +1,24 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
module org.eclipse.jetty.http3.client
{
exports org.eclipse.jetty.http3.client;
requires org.eclipse.jetty.http3.common;
requires org.eclipse.jetty.http3.quiche;
requires org.eclipse.jetty.client;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.util;
requires org.slf4j;
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
@ -311,7 +312,7 @@ public class ClientDatagramConnector extends ContainerLifeCycle implements IClie
protected EndPoint newEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
return new ClientDatagramEndPoint(channel, selector, selectionKey, getScheduler());
return new QuicDatagramEndPoint(channel, selector, selectionKey, getScheduler());
}
protected void connectFailed(Throwable failure, Map<String, Object> context)

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
@ -123,7 +124,7 @@ public class QuicConnection extends AbstractConnection
return;
}
InetSocketAddress remoteAddress = ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
InetSocketAddress remoteAddress = QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
if (LOG.isDebugEnabled())
LOG.debug("decoded peer IP address: {}, ciphertext packet size: {}", remoteAddress, cipherBuffer.remaining());
@ -201,7 +202,7 @@ public class QuicConnection extends AbstractConnection
if (entry == null)
return Action.IDLE;
ClientDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
getEndPoint().write(this, entry.buffers);
return Action.SCHEDULED;
}

View File

@ -13,6 +13,7 @@
module org.eclipse.jetty.http3.common
{
exports org.eclipse.jetty.http3.common;
requires transitive org.eclipse.jetty.http3.quiche;
requires transitive org.eclipse.jetty.io;

View File

@ -1,4 +1,4 @@
//
package org.eclipse.jetty.http3.common;//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
@ -11,8 +11,6 @@
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -33,9 +31,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
public class QuicDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(ClientDatagramEndPoint.class);
private static final Logger LOG = LoggerFactory.getLogger(QuicDatagramEndPoint.class);
public static InetAddressArgument INET_ADDRESS_ARGUMENT = new InetAddressArgument();
@ -61,7 +59,7 @@ public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedS
@Override
public String toString()
{
return String.format("%s:%s:%s", ClientDatagramEndPoint.this, _operation, getInvocationType());
return String.format("%s:%s:%s", QuicDatagramEndPoint.this, _operation, getInvocationType());
}
}
@ -77,11 +75,11 @@ public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedS
{
try
{
ClientDatagramEndPoint.this.close();
QuicDatagramEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", ClientDatagramEndPoint.this, x);
LOG.warn("Unable to close {}", QuicDatagramEndPoint.this, x);
}
}
}
@ -120,7 +118,7 @@ public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedS
@Override
public String toString()
{
return String.format("%s:%s:%s->%s", ClientDatagramEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
return String.format("%s:%s:%s->%s", QuicDatagramEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
}
};
@ -151,7 +149,7 @@ public class ClientDatagramEndPoint extends AbstractEndPoint implements ManagedS
}
};
public ClientDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
public QuicDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel = channel;

View File

@ -15,6 +15,7 @@ module org.eclipse.jetty.http3.server
{
exports org.eclipse.jetty.http3.server;
requires org.eclipse.jetty.http3.common;
requires org.eclipse.jetty.http3.quiche;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.server;

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.QuicheConfig;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
@ -46,7 +47,7 @@ public class QuicConnection extends AbstractConnection
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
public QuicConnection(Connector connector, ServerDatagramEndPoint endp)
public QuicConnection(Connector connector, QuicDatagramEndPoint endp)
{
super(endp, connector.getExecutor());
this.connector = connector;
@ -131,7 +132,7 @@ public class QuicConnection extends AbstractConnection
return;
}
InetSocketAddress remoteAddress = ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
InetSocketAddress remoteAddress = QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
if (LOG.isDebugEnabled())
LOG.debug("decoded peer IP address: {}, ciphertext packet size: {}", remoteAddress, cipherBuffer.remaining());
@ -164,7 +165,7 @@ public class QuicConnection extends AbstractConnection
}
BufferUtil.flipToFlush(negotiationBuffer, pos);
ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress);
QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress);
getEndPoint().write(Callback.from(() -> byteBufferPool.release(negotiationBuffer)), negotiationBuffer);
if (LOG.isDebugEnabled())
LOG.debug("QUIC connection negotiation packet sent");
@ -225,7 +226,7 @@ public class QuicConnection extends AbstractConnection
if (entry == null)
return Action.IDLE;
ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
QuicDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
getEndPoint().write(this, entry.buffers);
return Action.SCHEDULED;
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -180,13 +181,13 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new ServerDatagramEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
return new QuicDatagramEndPoint((DatagramChannel)channel, selector, selectionKey, getScheduler());
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return new QuicConnection(ServerDatagramConnector.this, (ServerDatagramEndPoint)endpoint);
return new QuicConnection(ServerDatagramConnector.this, (QuicDatagramEndPoint)endpoint);
}
@Override

View File

@ -1,439 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Objects;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
public static InetAddressArgument INET_ADDRESS_ARGUMENT = new InetAddressArgument();
private final AutoLock _lock = new AutoLock();
private final DatagramChannel _channel;
private final ManagedSelector _selector;
private SelectionKey _key;
private boolean _updatePending;
// The current value for interestOps.
private int _currentInterestOps;
// The desired value for interestOps.
private int _desiredInterestOps;
private abstract class RunnableTask implements Runnable, Invocable
{
final String _operation;
protected RunnableTask(String op)
{
_operation = op;
}
@Override
public String toString()
{
return String.format("%s:%s:%s", ServerDatagramEndPoint.this, _operation, getInvocationType());
}
}
private abstract class RunnableCloseable extends RunnableTask implements Closeable
{
protected RunnableCloseable(String op)
{
super(op);
}
@Override
public void close()
{
try
{
ServerDatagramEndPoint.this.close();
}
catch (Throwable x)
{
LOG.warn("Unable to close {}", ServerDatagramEndPoint.this, x);
}
}
}
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new RunnableCloseable("runFillable")
{
@Override
public InvocationType getInvocationType()
{
return getFillInterest().getCallbackInvocationType();
}
@Override
public void run()
{
getFillInterest().fillable();
}
};
private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite")
{
@Override
public InvocationType getInvocationType()
{
return getWriteFlusher().getCallbackInvocationType();
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
@Override
public String toString()
{
return String.format("%s:%s:%s->%s", ServerDatagramEndPoint.this, _operation, getInvocationType(), getWriteFlusher());
}
};
private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable")
{
@Override
public InvocationType getInvocationType()
{
InvocationType fillT = getFillInterest().getCallbackInvocationType();
InvocationType flushT = getWriteFlusher().getCallbackInvocationType();
if (fillT == flushT)
return fillT;
if (fillT == InvocationType.EITHER && flushT == InvocationType.NON_BLOCKING)
return InvocationType.EITHER;
if (fillT == InvocationType.NON_BLOCKING && flushT == InvocationType.EITHER)
return InvocationType.EITHER;
return InvocationType.BLOCKING;
}
@Override
public void run()
{
getWriteFlusher().completeWrite();
getFillInterest().fillable();
}
};
public ServerDatagramEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel = channel;
_selector = selector;
_key = key;
}
@Override
public InetSocketAddress getLocalAddress()
{
return (InetSocketAddress)_channel.socket().getLocalSocketAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return null;
}
@Override
public boolean isOpen()
{
return _channel.isOpen();
}
@Override
protected void doShutdownOutput()
{
}
@Override
public void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("doClose {}", this);
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug("Unable to close channel", e);
}
finally
{
super.doClose();
}
}
@Override
public void onClose(Throwable cause)
{
try
{
super.onClose(cause);
}
finally
{
if (_selector != null)
_selector.destroyEndPoint(this, cause);
}
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
if (isInputShutdown())
return -1;
int pos = BufferUtil.flipToFill(buffer);
InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer);
if (peer == null)
{
BufferUtil.flipToFlush(buffer, pos);
return 0;
}
INET_ADDRESS_ARGUMENT.push(peer);
notIdle();
BufferUtil.flipToFlush(buffer, pos);
int filled = buffer.remaining();
if (LOG.isDebugEnabled())
LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer));
return filled;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
boolean flushedAll = true;
long flushed = 0;
try
{
InetSocketAddress peer = INET_ADDRESS_ARGUMENT.pop();
if (LOG.isDebugEnabled())
LOG.debug("flushing {} buffer(s) to {}", buffers.length - 1, peer);
for (ByteBuffer buffer : buffers)
{
int sent = _channel.send(buffer, peer);
if (sent == 0)
{
flushedAll = false;
break;
}
flushed += sent;
}
if (LOG.isDebugEnabled())
LOG.debug("flushed {} byte(s), all flushed? {} - {}", flushed, flushedAll, this);
}
catch (IOException e)
{
throw new EofException(e);
}
if (flushed > 0)
notIdle();
return flushedAll;
}
public DatagramChannel getChannel()
{
return _channel;
}
@Override
public Object getTransport()
{
return _channel;
}
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
@Override
public Runnable onSelected()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
// return task to complete the job
Runnable task = fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);
if (LOG.isDebugEnabled())
LOG.debug("task {}", task);
return task;
}
private void updateKeyAction(Selector selector)
{
updateKey();
}
@Override
public void updateKey()
{
// This method runs from the selector thread,
// possibly concurrently with changeInterests(int).
try
{
int oldInterestOps;
int newInterestOps;
try (AutoLock l = _lock.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring key update for cancelled key {}", this, x);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for {}", this, x);
close();
}
}
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
private void changeInterests(int operation)
{
// This method runs from any thread, possibly
// concurrently with updateKey() and onSelected().
int oldInterestOps;
int newInterestOps;
boolean pending;
try (AutoLock l = _lock.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector != null)
_selector.submit(_updateKeyAction);
}
@Override
public String toEndPointString()
{
// We do a best effort to print the right toString() and that's it.
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toEndPointString(),
_currentInterestOps,
_desiredInterestOps,
ManagedSelector.safeInterestOps(_key),
ManagedSelector.safeReadyOps(_key));
}
public final static class InetAddressArgument
{
private final ThreadLocal<InetSocketAddress> threadLocal = new ThreadLocal<>();
public void push(InetSocketAddress inetSocketAddress)
{
Objects.requireNonNull(inetSocketAddress);
threadLocal.set(inetSocketAddress);
}
public InetSocketAddress pop()
{
InetSocketAddress inetSocketAddress = threadLocal.get();
Objects.requireNonNull(inetSocketAddress);
threadLocal.remove();
return inetSocketAddress;
}
}
}