make QuicStreamEndPoint extend from AbstractEndPoint and handle finished streams

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-17 08:39:43 +01:00 committed by Simone Bordet
parent d12550f88a
commit f0725d09d0
2 changed files with 49 additions and 114 deletions

View File

@ -64,16 +64,19 @@ public class QuicSession
return flushed; return flushed;
} }
public boolean isInputShutdown(long streamId) public boolean isFinished(long streamId)
{ {
// TODO: return quicheConnection.isStreamFinished(streamId);
return false;
} }
public boolean isOutputShutdown(long streamId) public void sendFinished(long streamId) throws IOException
{ {
// TODO: quicheConnection.feedFinForStream(streamId);
return false; }
public void shutdownInput(long streamId) throws IOException
{
quicheConnection.shutdownStream(streamId, false);
} }
public void shutdownOutput(long streamId) throws IOException public void shutdownOutput(long streamId) throws IOException
@ -81,6 +84,11 @@ public class QuicSession
quicheConnection.shutdownStream(streamId, true); quicheConnection.shutdownStream(streamId, true);
} }
public void onClose(long streamId)
{
endpoints.remove(streamId);
}
InetSocketAddress getLocalAddress() InetSocketAddress getLocalAddress()
{ {
return connection.getEndPoint().getLocalAddress(); return connection.getEndPoint().getLocalAddress();

View File

@ -16,50 +16,21 @@ package org.eclipse.jetty.http3.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class QuicStreamEndPoint extends IdleTimeout implements EndPoint public class QuicStreamEndPoint extends AbstractEndPoint
{ {
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
private final long createdTimeStamp = System.currentTimeMillis();
private final AtomicBoolean fillable = new AtomicBoolean(); private final AtomicBoolean fillable = new AtomicBoolean();
private final FillInterest fillInterest = new FillInterest()
{
@Override
protected void needsFillInterest()
{
if (fillable.getAndSet(false))
fillInterest.fillable();
}
};
private final WriteFlusher writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlush()
{
// No need to do anything.
// See QuicSession.process().
}
};
private final QuicSession session; private final QuicSession session;
private Connection connection;
private final long streamId; private final long streamId;
private boolean open;
public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId) public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId)
{ {
@ -81,19 +52,21 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
} }
@Override @Override
public boolean isOpen() protected void doShutdownInput()
{ {
return open; try
{
session.shutdownInput(streamId);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("error shutting down output", x);
}
} }
@Override @Override
public long getCreatedTimeStamp() protected void doShutdownOutput()
{
return createdTimeStamp;
}
@Override
public void shutdownOutput()
{ {
try try
{ {
@ -107,21 +80,19 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
} }
@Override @Override
public boolean isOutputShutdown() public void onClose(Throwable failure)
{ {
return session.isOutputShutdown(streamId); super.onClose(failure);
try
{
session.sendFinished(streamId);
} }
catch (IOException e)
@Override
public boolean isInputShutdown()
{ {
return session.isInputShutdown(streamId); if (LOG.isDebugEnabled())
LOG.debug("Error sending FIN on stream {}", streamId, e);
} }
session.onClose(streamId);
@Override
public void close(Throwable cause)
{
onClose(cause);
} }
@Override @Override
@ -130,6 +101,8 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
int pos = BufferUtil.flipToFill(buffer); int pos = BufferUtil.flipToFill(buffer);
int drained = session.fill(streamId, buffer); int drained = session.fill(streamId, buffer);
BufferUtil.flipToFlush(buffer, pos); BufferUtil.flipToFlush(buffer, pos);
if (session.isFinished(streamId))
shutdownInput();
return drained; return drained;
} }
@ -155,7 +128,7 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
public void onWritable() public void onWritable()
{ {
writeFlusher.completeWrite(); getWriteFlusher().completeWrite();
} }
public Runnable onReadable() public Runnable onReadable()
@ -163,68 +136,22 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
return () -> return () ->
{ {
//TODO: this is racy //TODO: this is racy
if (!fillInterest.fillable()) if (!getFillInterest().fillable())
fillable.set(true); fillable.set(true);
}; };
} }
@Override @Override
public void fillInterested(Callback callback) throws ReadPendingException protected void onIncompleteFlush()
{ {
fillInterest.register(callback); // No need to do anything.
// See QuicSession.process().
} }
@Override @Override
public boolean tryFillInterested(Callback callback) protected void needsFillInterest() throws IOException
{ {
return fillInterest.tryRegister(callback); if (fillable.getAndSet(false))
} getFillInterest().fillable();
@Override
public boolean isFillInterested()
{
return fillInterest.isInterested();
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
writeFlusher.write(callback, buffers);
}
@Override
public Connection getConnection()
{
return connection;
}
@Override
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public void onOpen()
{
open = true;
}
@Override
public void onClose(Throwable cause)
{
open = false;
}
@Override
public void upgrade(Connection newConnection)
{
throw new UnsupportedOperationException();
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
} }
} }