Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9
This commit is contained in:
commit
32e9548f0c
|
@ -0,0 +1,192 @@
|
|||
// ========================================================================
|
||||
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SelectChannelEndPointInterestsTest
|
||||
{
|
||||
private QueuedThreadPool threadPool;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private ServerSocketChannel connector;
|
||||
private SelectorManager selectorManager;
|
||||
|
||||
public void init(final Interested interested) throws Exception
|
||||
{
|
||||
threadPool = new QueuedThreadPool();
|
||||
threadPool.start();
|
||||
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
connector = ServerSocketChannel.open();
|
||||
connector.bind(new InetSocketAddress("localhost", 0));
|
||||
|
||||
selectorManager = new SelectorManager()
|
||||
{
|
||||
@Override
|
||||
protected void execute(Runnable task)
|
||||
{
|
||||
threadPool.execute(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
|
||||
{
|
||||
return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000)
|
||||
{
|
||||
@Override
|
||||
protected void onIncompleteFlush()
|
||||
{
|
||||
super.onIncompleteFlush();
|
||||
interested.onIncompleteFlush();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment)
|
||||
{
|
||||
return new AbstractAsyncConnection(endPoint, threadPool)
|
||||
{
|
||||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
interested.onFillable(endPoint, this);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
selectorManager.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadBlockedThenWriteBlockedThenReadableThenWritable() throws Exception
|
||||
{
|
||||
final AtomicInteger size = new AtomicInteger(1024 * 1024);
|
||||
final AtomicReference<Exception> failure = new AtomicReference<>();
|
||||
final CountDownLatch latch1 = new CountDownLatch(1);
|
||||
final CountDownLatch latch2 = new CountDownLatch(1);
|
||||
final AtomicBoolean writeBlocked = new AtomicBoolean();
|
||||
init(new Interested()
|
||||
{
|
||||
@Override
|
||||
public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection)
|
||||
{
|
||||
ByteBuffer input = BufferUtil.allocate(2);
|
||||
int read = fill(endPoint, input);
|
||||
|
||||
if (read == 1)
|
||||
{
|
||||
byte b = input.get();
|
||||
if (b == 1)
|
||||
{
|
||||
connection.fillInterested();
|
||||
|
||||
ByteBuffer output = ByteBuffer.allocate(size.get());
|
||||
endPoint.write(null, new Callback.Empty<>(), output);
|
||||
|
||||
latch1.countDown();
|
||||
}
|
||||
else
|
||||
{
|
||||
latch2.countDown();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
failure.set(new Exception("Unexpectedly read " + read + " bytes"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIncompleteFlush()
|
||||
{
|
||||
writeBlocked.set(true);
|
||||
}
|
||||
|
||||
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
|
||||
{
|
||||
try
|
||||
{
|
||||
return endPoint.fill(buffer);
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
failure.set(x);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Socket client = new Socket();
|
||||
client.connect(connector.getLocalAddress());
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = connector.accept();
|
||||
server.configureBlocking(false);
|
||||
selectorManager.accept(server);
|
||||
|
||||
OutputStream clientOutput = client.getOutputStream();
|
||||
clientOutput.write(1);
|
||||
clientOutput.flush();
|
||||
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// We do not read to keep the socket write blocked
|
||||
|
||||
clientOutput.write(2);
|
||||
clientOutput.flush();
|
||||
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Sleep before reading to allow waking up the server only for read
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Now read what was written, waking up the server for write
|
||||
InputStream clientInput = client.getInputStream();
|
||||
while (size.getAndDecrement() > 0)
|
||||
clientInput.read();
|
||||
|
||||
client.close();
|
||||
|
||||
Assert.assertNull(failure.get());
|
||||
}
|
||||
|
||||
private interface Interested
|
||||
{
|
||||
void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection);
|
||||
|
||||
void onIncompleteFlush();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,18 +1,15 @@
|
|||
/*
|
||||
* Copyright (c) 2012 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// ========================================================================
|
||||
// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.spdy;
|
||||
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
/*
|
||||
* Copyright (c) 2012 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// ========================================================================
|
||||
// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.spdy;
|
||||
|
||||
|
@ -35,4 +32,10 @@ public interface ISession extends Session
|
|||
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback<C> callback, C context);
|
||||
|
||||
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback<C> callback, C context);
|
||||
|
||||
/**
|
||||
* <p>Gracefully shuts down this session.</p>
|
||||
* <p>A special item is queued that will close the connection when it will be dequeued.</p>
|
||||
*/
|
||||
public void shutdown();
|
||||
}
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
//========================================================================
|
||||
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
|
||||
//------------------------------------------------------------------------
|
||||
//All rights reserved. This program and the accompanying materials
|
||||
//are made available under the terms of the Eclipse Public License v1.0
|
||||
//and Apache License v2.0 which accompanies this distribution.
|
||||
//The Eclipse Public License is available at
|
||||
//http://www.eclipse.org/legal/epl-v10.html
|
||||
//The Apache License v2.0 is available at
|
||||
//http://www.opensource.org/licenses/apache2.0.php
|
||||
//You may elect to redistribute this code under either of these licenses.
|
||||
//========================================================================
|
||||
// ========================================================================
|
||||
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.spdy;
|
||||
|
||||
|
@ -65,6 +65,7 @@ import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
|
|||
import org.eclipse.jetty.spdy.generator.Generator;
|
||||
import org.eclipse.jetty.spdy.parser.Parser;
|
||||
import org.eclipse.jetty.util.Atomics;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.component.AggregateLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
|
@ -873,6 +874,14 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
|
|||
flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown()
|
||||
{
|
||||
FrameBytes frameBytes = new CloseFrameBytes();
|
||||
append(frameBytes);
|
||||
flush();
|
||||
}
|
||||
|
||||
private void execute(Runnable task)
|
||||
{
|
||||
threadPool.execute(task);
|
||||
|
@ -1300,4 +1309,25 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
|
|||
return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
|
||||
}
|
||||
}
|
||||
|
||||
private class CloseFrameBytes extends AbstractFrameBytes<Void>
|
||||
{
|
||||
private CloseFrameBytes()
|
||||
{
|
||||
super(null, new Empty<Void>(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
return BufferUtil.EMPTY_BUFFER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete()
|
||||
{
|
||||
super.complete();
|
||||
close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,16 +86,8 @@ public class StandardStreamTest
|
|||
{
|
||||
PushSynInfo pushSynInfo = (PushSynInfo)argument;
|
||||
if (pushSynInfo.getAssociatedStreamId() != associatedStreamId)
|
||||
{
|
||||
System.out.println("streamIds do not match!");
|
||||
return false;
|
||||
}
|
||||
if (pushSynInfo.isClose() != synInfo.isClose())
|
||||
{
|
||||
System.out.println("isClose doesn't match");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return pushSynInfo.isClose() == synInfo.isClose();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
/*
|
||||
* Copyright (c) 2012 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// ========================================================================
|
||||
// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.spdy.http;
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.eclipse.jetty.io.AbstractAsyncConnection;
|
|||
import org.eclipse.jetty.io.AsyncEndPoint;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.RuntimeIOException;
|
||||
import org.eclipse.jetty.spdy.api.Session;
|
||||
import org.eclipse.jetty.spdy.parser.Parser;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -32,7 +31,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
|
|||
private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final Parser parser;
|
||||
private volatile Session session;
|
||||
private volatile ISession session;
|
||||
private volatile boolean idle = false;
|
||||
|
||||
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
|
||||
|
@ -65,7 +64,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
|
|||
}
|
||||
else if (filled < 0)
|
||||
{
|
||||
close(false);
|
||||
shutdown(session);
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
|
@ -121,16 +120,28 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
|
|||
protected boolean onReadTimeout()
|
||||
{
|
||||
if (idle)
|
||||
session.goAway();
|
||||
goAway(session);
|
||||
return false;
|
||||
}
|
||||
|
||||
protected Session getSession()
|
||||
protected void goAway(ISession session)
|
||||
{
|
||||
if (session != null)
|
||||
session.goAway();
|
||||
}
|
||||
|
||||
private void shutdown(ISession session)
|
||||
{
|
||||
if (session != null)
|
||||
session.shutdown();
|
||||
}
|
||||
|
||||
protected ISession getSession()
|
||||
{
|
||||
return session;
|
||||
}
|
||||
|
||||
protected void setSession(Session session)
|
||||
protected void setSession(ISession session)
|
||||
{
|
||||
this.session = session;
|
||||
}
|
||||
|
|
|
@ -141,7 +141,6 @@ public class ClosedStreamTest extends AbstractTest
|
|||
@Override
|
||||
public void onReply(Stream stream, ReplyInfo replyInfo)
|
||||
{
|
||||
System.out.println("ONREPLY CLIENT CALLED");
|
||||
replyReceivedLatch.countDown();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue