fill kinda works

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-12 13:35:12 +01:00 committed by Simone Bordet
parent 45e45e6cf2
commit 24dbb4c084
2 changed files with 60 additions and 14 deletions

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -182,9 +183,12 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException protected SocketAddress doReadDatagram(SelectableChannel channel) throws IOException
{ {
ByteBuffer buffer = getByteBufferPool().acquire(1200, true); ByteBuffer buffer = getByteBufferPool().acquire(1200, true);
BufferUtil.flipToFill(buffer);
LOG.info("doReadDatagram {}", channel); LOG.info("doReadDatagram {}", channel);
DatagramChannel datagramChannel = (DatagramChannel)channel; DatagramChannel datagramChannel = (DatagramChannel)channel;
SocketAddress peer = datagramChannel.receive(buffer); SocketAddress peer = datagramChannel.receive(buffer);
buffer.flip();
LOG.info("doReadDatagram received {} byte(s)", buffer.remaining());
SocketAddress localAddress = datagramChannel.getLocalAddress(); SocketAddress localAddress = datagramChannel.getLocalAddress();
boolean[] created = new boolean[1]; boolean[] created = new boolean[1];

View File

@ -11,7 +11,9 @@ import java.nio.channels.WritePendingException;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -20,37 +22,58 @@ public class ServerDatagramEndPoint implements EndPoint
{ {
private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class); private static final Logger LOG = LoggerFactory.getLogger(ServerDatagramEndPoint.class);
public ServerDatagramEndPoint(SocketAddress localAddress, SocketAddress remoteAddress, ByteBuffer buffer, SelectableChannel channel) private final FillInterest fillInterest = new FillInterest() {
@Override
protected void needsFillInterest() throws IOException
{ {
} }
};
private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
private final SelectableChannel channel;
private ManagedSelector selector;
private SelectionKey selectionKey;
private Connection connection;
private ByteBuffer data;
private boolean open;
public ServerDatagramEndPoint(SocketAddress localAddress, SocketAddress remoteAddress, ByteBuffer buffer, SelectableChannel channel)
{
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
this.channel = channel;
this.data = buffer;
}
public void init(ManagedSelector selector, SelectionKey selectionKey) public void init(ManagedSelector selector, SelectionKey selectionKey)
{ {
this.selector = selector;
this.selectionKey = selectionKey;
} }
public void onData(ByteBuffer buffer) public void onData(ByteBuffer buffer)
{ {
this.data = buffer;
fillInterest.fillable();
} }
@Override @Override
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {
throw new UnsupportedOperationException(); return (InetSocketAddress)localAddress;
} }
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {
throw new UnsupportedOperationException(); return (InetSocketAddress)remoteAddress;
} }
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return false; return open;
} }
@Override @Override
@ -86,7 +109,21 @@ public class ServerDatagramEndPoint implements EndPoint
@Override @Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
throw new UnsupportedOperationException(); if (data != null)
{
int before = data.remaining();
LOG.info("fill; bytes remaining: {} byte(s)", before);
BufferUtil.flipToFill(buffer);
buffer.put(data);
buffer.flip();
int after = data.remaining();
if (after == 0)
data = null;
int filled = before - after;
LOG.info("filled {} byte(s)", filled);
return filled;
}
return 0;
} }
@Override @Override
@ -98,7 +135,7 @@ public class ServerDatagramEndPoint implements EndPoint
@Override @Override
public Object getTransport() public Object getTransport()
{ {
throw new UnsupportedOperationException(); return this.channel;
} }
@Override @Override
@ -116,19 +153,24 @@ public class ServerDatagramEndPoint implements EndPoint
@Override @Override
public void fillInterested(Callback callback) throws ReadPendingException public void fillInterested(Callback callback) throws ReadPendingException
{ {
throw new UnsupportedOperationException(); fillInterest.register(callback);
if (data != null)
fillInterest.fillable();
} }
@Override @Override
public boolean tryFillInterested(Callback callback) public boolean tryFillInterested(Callback callback)
{ {
throw new UnsupportedOperationException(); boolean registered = fillInterest.tryRegister(callback);
if (registered && data != null)
fillInterest.fillable();
return registered;
} }
@Override @Override
public boolean isFillInterested() public boolean isFillInterested()
{ {
throw new UnsupportedOperationException(); return fillInterest.isInterested();
} }
@Override @Override
@ -143,8 +185,6 @@ public class ServerDatagramEndPoint implements EndPoint
return connection; return connection;
} }
private Connection connection;
@Override @Override
public void setConnection(Connection connection) public void setConnection(Connection connection)
{ {
@ -154,13 +194,15 @@ public class ServerDatagramEndPoint implements EndPoint
@Override @Override
public void onOpen() public void onOpen()
{ {
open = true;
LOG.info("onOpen"); LOG.info("onOpen");
} }
@Override @Override
public void onClose(Throwable cause) public void onClose(Throwable cause)
{ {
throw new UnsupportedOperationException(); LOG.info("onClose");
open = false;
} }
@Override @Override