cleanup SelectorManager accept FSM
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
3c1f1d3fe0
commit
ca4562ca8e
|
@ -23,6 +23,7 @@ import java.nio.channels.Selector;
|
|||
import java.util.EventListener;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
@ -42,7 +43,6 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
private final ServerDatagramSelectorManager _manager;
|
||||
private volatile DatagramChannel _datagramChannel;
|
||||
private volatile int _localPort = -1;
|
||||
private Closeable _acceptor;
|
||||
|
||||
public ServerDatagramConnector(
|
||||
@Name("server") Server server,
|
||||
|
@ -77,13 +77,12 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
for (EventListener l : getBeans(SelectorManager.SelectorManagerListener.class))
|
||||
_manager.addEventListener(l);
|
||||
super.doStart();
|
||||
_acceptor = _manager.datagramReader(_datagramChannel);
|
||||
_manager.accept(_datagramChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
IO.close(_acceptor);
|
||||
super.doStop();
|
||||
for (EventListener l : getBeans(EventListener.class))
|
||||
_manager.removeEventListener(l);
|
||||
|
@ -171,12 +170,11 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
super(executor, scheduler, selectors);
|
||||
}
|
||||
|
||||
public Closeable datagramReader(SelectableChannel server)
|
||||
@Override
|
||||
public void accept(SelectableChannel channel, Object attachment)
|
||||
{
|
||||
ManagedSelector selector = chooseSelector();
|
||||
DatagramReader reader = new DatagramReader(server);
|
||||
selector.submit(reader);
|
||||
return reader;
|
||||
selector.submit(new Accept(channel, attachment));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -197,15 +195,17 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
return String.format("DatagramSelectorManager@%s", ServerDatagramConnector.this);
|
||||
}
|
||||
|
||||
class DatagramReader implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Closeable
|
||||
class Accept implements ManagedSelector.SelectorUpdate, ManagedSelector.Selectable, Runnable, Closeable
|
||||
{
|
||||
private final AtomicBoolean failed = new AtomicBoolean();
|
||||
private final SelectableChannel _channel;
|
||||
private volatile boolean endPointCreated;
|
||||
private final Object _attachment;
|
||||
private volatile SelectionKey _key;
|
||||
|
||||
DatagramReader(SelectableChannel channel)
|
||||
Accept(SelectableChannel channel, Object attachment)
|
||||
{
|
||||
_channel = channel;
|
||||
_attachment = attachment;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -219,9 +219,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
IO.close(_channel);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Unable to register OP_READ on selector for {}", _channel, x);
|
||||
failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,24 +227,23 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
public Runnable onSelected()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("DatagramReader onSelected");
|
||||
if (!endPointCreated)
|
||||
LOG.debug("Accept onSelected");
|
||||
|
||||
_key.interestOps(0);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
// TODO needs to be dispatched.
|
||||
chooseSelector().createEndPoint(_channel, _key);
|
||||
endPointCreated = true;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
IO.close(_datagramChannel);
|
||||
// TODO: is this enough of we need to notify someone?
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("createEndPoint failed for channel {}", _channel, x);
|
||||
}
|
||||
chooseSelector().createEndPoint(_channel, _key);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
failed(x);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -267,6 +264,15 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
// Implements AbstractConnector.setAccepting(boolean).
|
||||
chooseSelector().submit(selector -> _key.cancel());
|
||||
}
|
||||
|
||||
private void failed(Throwable failure)
|
||||
{
|
||||
if (failed.compareAndSet(false, true))
|
||||
{
|
||||
IO.close(_channel);
|
||||
_manager.connectionFailed(_channel, failure, _attachment);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ServerDatagramEndPoint extends AbstractEndPoint implements ManagedS
|
|||
private SelectionKey _key;
|
||||
private boolean _updatePending;
|
||||
// The current value for interestOps.
|
||||
private int _currentInterestOps = SelectionKey.OP_READ; // See DatagramReader.update()
|
||||
private int _currentInterestOps;
|
||||
// The desired value for interestOps.
|
||||
private int _desiredInterestOps;
|
||||
|
||||
|
|
Loading…
Reference in New Issue