Jetty9 - SPDY: gracefully shutting down the Session in case of read of -1 from the remote peer.

A special item is queued into the Session for flushing, and when it is dequeued it closes the
underlying connection.
This allows pending writes to complete before the underlying connection is closed.
This commit is contained in:
Simone Bordet 2012-07-30 23:01:30 +02:00
parent b4b1bc814c
commit 9c717d552f
3 changed files with 77 additions and 33 deletions

View File

@ -1,18 +1,15 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// ========================================================================
// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.spdy;
@ -35,4 +32,10 @@ public interface ISession extends Session
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback<C> callback, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback<C> callback, C context);
/**
* <p>Gracefully shuts down this session.</p>
* <p>A special item is queued that will close the connection when it will be dequeued.</p>
*/
public void shutdown();
}

View File

@ -1,15 +1,15 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.spdy;
@ -65,6 +65,7 @@ import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@ -873,6 +874,14 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
flush();
}
@Override
public void shutdown()
{
FrameBytes frameBytes = new CloseFrameBytes();
append(frameBytes);
flush();
}
private void execute(Runnable task)
{
threadPool.execute(task);
@ -1300,4 +1309,25 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
}
}
private class CloseFrameBytes extends AbstractFrameBytes<Void>
{
private CloseFrameBytes()
{
super(null, new Empty<Void>(), null);
}
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public void complete()
{
super.complete();
close();
}
}
}

View File

@ -21,7 +21,6 @@ import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -32,7 +31,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
private final ByteBufferPool bufferPool;
private final Parser parser;
private volatile Session session;
private volatile ISession session;
private volatile boolean idle = false;
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
@ -65,7 +64,7 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
}
else if (filled < 0)
{
close(false);
shutdown(session);
return -1;
}
else
@ -121,16 +120,28 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
protected boolean onReadTimeout()
{
if (idle)
session.goAway();
goAway(session);
return false;
}
protected Session getSession()
protected void goAway(ISession session)
{
if (session != null)
session.goAway();
}
private void shutdown(ISession session)
{
if (session != null)
session.shutdown();
}
protected ISession getSession()
{
return session;
}
protected void setSession(Session session)
protected void setSession(ISession session)
{
this.session = session;
}