replace InetSocketAddress decoding/encoding with side-stack argument passing
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
62eed58afb
commit
ee900b1a8d
|
@ -1,81 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
||||
public class AddressCodec
|
||||
{
|
||||
public static final int ENCODED_ADDRESS_LENGTH = 19;
|
||||
|
||||
public static ByteBuffer encodeInetSocketAddress(ByteBufferPool byteBufferPool, InetSocketAddress remoteAddress) throws IOException
|
||||
{
|
||||
ByteBuffer addressBuffer = byteBufferPool.acquire(ENCODED_ADDRESS_LENGTH, true);
|
||||
try
|
||||
{
|
||||
int pos = BufferUtil.flipToFill(addressBuffer);
|
||||
encodeInetSocketAddress(addressBuffer, remoteAddress);
|
||||
BufferUtil.flipToFlush(addressBuffer, pos);
|
||||
return addressBuffer;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
byteBufferPool.release(addressBuffer);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
||||
public static InetSocketAddress decodeInetSocketAddress(ByteBuffer buffer) throws IOException
|
||||
{
|
||||
int headerPosition = buffer.position();
|
||||
byte ipVersion = buffer.get();
|
||||
byte[] address;
|
||||
if (ipVersion == 4)
|
||||
address = new byte[4];
|
||||
else if (ipVersion == 6)
|
||||
address = new byte[16];
|
||||
else
|
||||
throw new IOException("Unsupported IP version: " + ipVersion);
|
||||
buffer.get(address);
|
||||
int port = buffer.getChar();
|
||||
buffer.position(headerPosition + ENCODED_ADDRESS_LENGTH);
|
||||
return new InetSocketAddress(InetAddress.getByAddress(address), port);
|
||||
}
|
||||
|
||||
public static void encodeInetSocketAddress(ByteBuffer buffer, InetSocketAddress peer) throws IOException
|
||||
{
|
||||
int headerPosition = buffer.position();
|
||||
byte[] addressBytes = peer.getAddress().getAddress();
|
||||
int port = peer.getPort();
|
||||
byte ipVersion;
|
||||
if (peer.getAddress() instanceof Inet4Address)
|
||||
ipVersion = 4;
|
||||
else if (peer.getAddress() instanceof Inet6Address)
|
||||
ipVersion = 6;
|
||||
else
|
||||
throw new IOException("Unsupported address type: " + peer.getAddress().getClass());
|
||||
buffer.put(ipVersion);
|
||||
buffer.put(addressBytes);
|
||||
buffer.putChar((char)port);
|
||||
buffer.position(headerPosition + ENCODED_ADDRESS_LENGTH);
|
||||
}
|
||||
}
|
|
@ -103,7 +103,7 @@ public class QuicConnection extends AbstractConnection
|
|||
{
|
||||
try
|
||||
{
|
||||
ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN + AddressCodec.ENCODED_ADDRESS_LENGTH, true);
|
||||
ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
|
||||
while (true)
|
||||
{
|
||||
BufferUtil.clear(cipherBuffer);
|
||||
|
@ -124,7 +124,7 @@ public class QuicConnection extends AbstractConnection
|
|||
return;
|
||||
}
|
||||
|
||||
InetSocketAddress remoteAddress = AddressCodec.decodeInetSocketAddress(cipherBuffer);
|
||||
InetSocketAddress remoteAddress = ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("decoded peer IP address: {}, ciphertext packet size: {}", remoteAddress, cipherBuffer.remaining());
|
||||
|
||||
|
@ -146,24 +146,19 @@ public class QuicConnection extends AbstractConnection
|
|||
QuicheConnection quicheConnection = QuicheConnection.tryAccept(quicheConfig, remoteAddress, cipherBuffer);
|
||||
if (quicheConnection == null)
|
||||
{
|
||||
ByteBuffer addressBuffer = AddressCodec.encodeInetSocketAddress(byteBufferPool, remoteAddress);
|
||||
ByteBuffer negotiationBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
|
||||
int pos = BufferUtil.flipToFill(negotiationBuffer);
|
||||
if (!QuicheConnection.negotiate(remoteAddress, cipherBuffer, negotiationBuffer))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("QUIC connection negotiation failed, dropping packet");
|
||||
byteBufferPool.release(addressBuffer);
|
||||
byteBufferPool.release(negotiationBuffer);
|
||||
continue;
|
||||
}
|
||||
BufferUtil.flipToFlush(negotiationBuffer, pos);
|
||||
|
||||
getEndPoint().write(Callback.from(() ->
|
||||
{
|
||||
byteBufferPool.release(addressBuffer);
|
||||
byteBufferPool.release(negotiationBuffer);
|
||||
}), addressBuffer, negotiationBuffer);
|
||||
ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress);
|
||||
getEndPoint().write(Callback.from(() -> byteBufferPool.release(negotiationBuffer)), negotiationBuffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("QUIC connection negotiation packet sent");
|
||||
}
|
||||
|
@ -187,13 +182,16 @@ public class QuicConnection extends AbstractConnection
|
|||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("caught exception in onFillable loop", x);
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
public void write(Callback callback, ByteBuffer... buffers)
|
||||
{
|
||||
flusher.offer(callback, buffers);
|
||||
InetSocketAddress address = ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.pop();
|
||||
flusher.offer(callback, address, buffers);
|
||||
flusher.iterate();
|
||||
}
|
||||
|
||||
|
@ -203,11 +201,11 @@ public class QuicConnection extends AbstractConnection
|
|||
private final ArrayDeque<Entry> queue = new ArrayDeque<>();
|
||||
private Entry entry;
|
||||
|
||||
public void offer(Callback callback, ByteBuffer[] buffers)
|
||||
public void offer(Callback callback, InetSocketAddress address, ByteBuffer[] buffers)
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
queue.offer(new Entry(callback, buffers));
|
||||
queue.offer(new Entry(callback, address, buffers));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,6 +219,7 @@ public class QuicConnection extends AbstractConnection
|
|||
if (entry == null)
|
||||
return Action.IDLE;
|
||||
|
||||
ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(entry.address);
|
||||
getEndPoint().write(this, entry.buffers);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
@ -249,11 +248,13 @@ public class QuicConnection extends AbstractConnection
|
|||
private class Entry
|
||||
{
|
||||
private final Callback callback;
|
||||
private final InetSocketAddress address;
|
||||
private final ByteBuffer[] buffers;
|
||||
|
||||
private Entry(Callback callback, ByteBuffer[] buffers)
|
||||
private Entry(Callback callback, InetSocketAddress address, ByteBuffer[] buffers)
|
||||
{
|
||||
this.callback = callback;
|
||||
this.address = address;
|
||||
this.buffers = buffers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,7 +252,6 @@ public class QuicSession
|
|||
private class Flusher extends IteratingCallback
|
||||
{
|
||||
private final CyclicTimeout timeout;
|
||||
private ByteBuffer addressBuffer;
|
||||
private ByteBuffer cipherBuffer;
|
||||
|
||||
public Flusher(Scheduler scheduler)
|
||||
|
@ -282,8 +281,7 @@ public class QuicSession
|
|||
protected Action process() throws IOException
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
addressBuffer = AddressCodec.encodeInetSocketAddress(byteBufferPool, remoteAddress);
|
||||
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN + AddressCodec.ENCODED_ADDRESS_LENGTH, true);
|
||||
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
|
||||
int pos = BufferUtil.flipToFill(cipherBuffer);
|
||||
int drained = quicheConnection.drainCipherText(cipherBuffer);
|
||||
long nextTimeoutInMs = quicheConnection.nextTimeout();
|
||||
|
@ -304,7 +302,8 @@ public class QuicSession
|
|||
return Action.IDLE;
|
||||
}
|
||||
BufferUtil.flipToFlush(cipherBuffer, pos);
|
||||
connection.write(this, addressBuffer, cipherBuffer);
|
||||
ServerDatagramEndPoint.INET_ADDRESS_ARGUMENT.push(remoteAddress);
|
||||
connection.write(this, cipherBuffer);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
@ -312,7 +311,6 @@ public class QuicSession
|
|||
public void succeeded()
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
byteBufferPool.release(addressBuffer);
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
super.succeeded();
|
||||
}
|
||||
|
@ -321,7 +319,6 @@ public class QuicSession
|
|||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
byteBufferPool.release(addressBuffer);
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.nio.channels.CancelledKeyException;
|
|||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
|
@ -36,6 +37,8 @@ public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedS
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
|
||||
|
||||
public static InetAddressArgument INET_ADDRESS_ARGUMENT = new InetAddressArgument();
|
||||
|
||||
private final AutoLock _lock = new AutoLock();
|
||||
private final DatagramChannel _channel;
|
||||
private final ManagedSelector _selector;
|
||||
|
@ -219,24 +222,17 @@ public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedS
|
|||
return -1;
|
||||
|
||||
int pos = BufferUtil.flipToFill(buffer);
|
||||
|
||||
buffer.position(pos + AddressCodec.ENCODED_ADDRESS_LENGTH);
|
||||
InetSocketAddress peer = (InetSocketAddress)_channel.receive(buffer);
|
||||
if (peer == null)
|
||||
{
|
||||
buffer.position(pos);
|
||||
BufferUtil.flipToFlush(buffer, pos);
|
||||
return 0;
|
||||
}
|
||||
INET_ADDRESS_ARGUMENT.push(peer);
|
||||
|
||||
notIdle();
|
||||
int finalPosition = buffer.position();
|
||||
buffer.position(pos);
|
||||
AddressCodec.encodeInetSocketAddress(buffer, peer);
|
||||
buffer.position(finalPosition);
|
||||
BufferUtil.flipToFlush(buffer, pos);
|
||||
|
||||
int filled = finalPosition - AddressCodec.ENCODED_ADDRESS_LENGTH;
|
||||
int filled = buffer.remaining();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("filled {} {}", filled, BufferUtil.toDetailString(buffer));
|
||||
return filled;
|
||||
|
@ -249,12 +245,11 @@ public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedS
|
|||
long flushed = 0;
|
||||
try
|
||||
{
|
||||
InetSocketAddress peer = AddressCodec.decodeInetSocketAddress(buffers[0]);
|
||||
InetSocketAddress peer = INET_ADDRESS_ARGUMENT.pop();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushing {} buffer(s) to {}", buffers.length - 1, peer);
|
||||
for (int i = 1; i < buffers.length; i++)
|
||||
for (ByteBuffer buffer : buffers)
|
||||
{
|
||||
ByteBuffer buffer = buffers[i];
|
||||
int sent = _channel.send(buffer, peer);
|
||||
if (sent == 0)
|
||||
{
|
||||
|
@ -422,4 +417,23 @@ public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedS
|
|||
ManagedSelector.safeInterestOps(_key),
|
||||
ManagedSelector.safeReadyOps(_key));
|
||||
}
|
||||
|
||||
public final static class InetAddressArgument
|
||||
{
|
||||
private final ThreadLocal<InetSocketAddress> threadLocal = new ThreadLocal<>();
|
||||
|
||||
public void push(InetSocketAddress inetSocketAddress)
|
||||
{
|
||||
Objects.requireNonNull(inetSocketAddress);
|
||||
threadLocal.set(inetSocketAddress);
|
||||
}
|
||||
|
||||
public InetSocketAddress pop()
|
||||
{
|
||||
InetSocketAddress inetSocketAddress = threadLocal.get();
|
||||
Objects.requireNonNull(inetSocketAddress);
|
||||
threadLocal.remove();
|
||||
return inetSocketAddress;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue