Jetty9 - Test for better handling for I/O interests.
This commit is contained in:
parent
e75e0e9a04
commit
35b61feae2
|
@ -0,0 +1,192 @@
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
|
||||||
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class SelectChannelEndPointInterestsTest
|
||||||
|
{
|
||||||
|
private QueuedThreadPool threadPool;
|
||||||
|
private ScheduledExecutorService scheduler;
|
||||||
|
private ServerSocketChannel connector;
|
||||||
|
private SelectorManager selectorManager;
|
||||||
|
|
||||||
|
public void init(final Interested interested) throws Exception
|
||||||
|
{
|
||||||
|
threadPool = new QueuedThreadPool();
|
||||||
|
threadPool.start();
|
||||||
|
|
||||||
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
|
connector = ServerSocketChannel.open();
|
||||||
|
connector.bind(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
|
selectorManager = new SelectorManager()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void execute(Runnable task)
|
||||||
|
{
|
||||||
|
threadPool.execute(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
|
||||||
|
{
|
||||||
|
return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onIncompleteFlush()
|
||||||
|
{
|
||||||
|
super.onIncompleteFlush();
|
||||||
|
interested.onIncompleteFlush();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment)
|
||||||
|
{
|
||||||
|
return new AbstractAsyncConnection(endPoint, threadPool)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onFillable()
|
||||||
|
{
|
||||||
|
interested.onFillable(endPoint, this);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
selectorManager.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadBlockedThenWriteBlockedThenReadableThenWritable() throws Exception
|
||||||
|
{
|
||||||
|
final AtomicInteger size = new AtomicInteger(1024 * 1024);
|
||||||
|
final AtomicReference<Exception> failure = new AtomicReference<>();
|
||||||
|
final CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
|
final CountDownLatch latch2 = new CountDownLatch(1);
|
||||||
|
final AtomicBoolean writeBlocked = new AtomicBoolean();
|
||||||
|
init(new Interested()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection)
|
||||||
|
{
|
||||||
|
ByteBuffer input = BufferUtil.allocate(2);
|
||||||
|
int read = fill(endPoint, input);
|
||||||
|
|
||||||
|
if (read == 1)
|
||||||
|
{
|
||||||
|
byte b = input.get();
|
||||||
|
if (b == 1)
|
||||||
|
{
|
||||||
|
connection.fillInterested();
|
||||||
|
|
||||||
|
ByteBuffer output = ByteBuffer.allocate(size.get());
|
||||||
|
endPoint.write(null, new Callback.Empty<>(), output);
|
||||||
|
|
||||||
|
latch1.countDown();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
failure.set(new Exception("Unexpectedly read " + read + " bytes"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onIncompleteFlush()
|
||||||
|
{
|
||||||
|
writeBlocked.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return endPoint.fill(buffer);
|
||||||
|
}
|
||||||
|
catch (IOException x)
|
||||||
|
{
|
||||||
|
failure.set(x);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Socket client = new Socket();
|
||||||
|
client.connect(connector.getLocalAddress());
|
||||||
|
client.setSoTimeout(5000);
|
||||||
|
|
||||||
|
SocketChannel server = connector.accept();
|
||||||
|
server.configureBlocking(false);
|
||||||
|
selectorManager.accept(server);
|
||||||
|
|
||||||
|
OutputStream clientOutput = client.getOutputStream();
|
||||||
|
clientOutput.write(1);
|
||||||
|
clientOutput.flush();
|
||||||
|
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// We do not read to keep the socket write blocked
|
||||||
|
|
||||||
|
clientOutput.write(2);
|
||||||
|
clientOutput.flush();
|
||||||
|
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Sleep before reading to allow waking up the server only for read
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Now read what was written, waking up the server for write
|
||||||
|
InputStream clientInput = client.getInputStream();
|
||||||
|
while (size.getAndDecrement() > 0)
|
||||||
|
clientInput.read();
|
||||||
|
|
||||||
|
client.close();
|
||||||
|
|
||||||
|
Assert.assertNull(failure.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface Interested
|
||||||
|
{
|
||||||
|
void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection);
|
||||||
|
|
||||||
|
void onIncompleteFlush();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue