423930 - SPDY streams are leaked.

With this fix, when streams are closed or failed, the stream is closed
and its idle timeout is canceled, avoiding the leaking.
This commit is contained in:
Simone Bordet 2013-12-13 13:27:11 +01:00
parent f4a5f68bdd
commit 6f316f9887
3 changed files with 235 additions and 79 deletions

View File

@ -114,6 +114,15 @@ public class StandardStream extends IdleTimeout implements IStream
StreamFrameListener listener = this.listener; StreamFrameListener listener = this.listener;
if (listener != null) if (listener != null)
listener.onFailure(this, timeout); listener.onFailure(this, timeout);
// The stream is now gone, we must close it to
// avoid that its idle timeout is rescheduled.
close();
}
private void close()
{
closeState = CloseState.CLOSED;
onClose();
} }
@Override @Override
@ -189,13 +198,13 @@ public class StandardStream extends IdleTimeout implements IStream
if (local) if (local)
throw new IllegalStateException(); throw new IllegalStateException();
else else
closeState = CloseState.CLOSED; close();
break; break;
} }
case REMOTELY_CLOSED: case REMOTELY_CLOSED:
{ {
if (local) if (local)
closeState = CloseState.CLOSED; close();
else else
throw new IllegalStateException(); throw new IllegalStateException();
break; break;
@ -369,12 +378,13 @@ public class StandardStream extends IdleTimeout implements IStream
notIdle(); notIdle();
if (isClosed() || isReset()) if (isClosed() || isReset())
{ {
close();
promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED, promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
"Stream: " + this + " already closed or reset!")); "Stream: " + this + " already closed or reset!"));
return; return;
} }
PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo); PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
session.syn(pushSynInfo, null, promise); session.syn(pushSynInfo, null, new StreamPromise(promise));
} }
@Override @Override
@ -393,11 +403,14 @@ public class StandardStream extends IdleTimeout implements IStream
{ {
notIdle(); notIdle();
if (isUnidirectional()) if (isUnidirectional())
{
close();
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams"); throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
}
openState = OpenState.REPLY_SENT; openState = OpenState.REPLY_SENT;
updateCloseState(replyInfo.isClose(), true); updateCloseState(replyInfo.isClose(), true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders()); SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback); session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), new StreamCallback(callback));
} }
@Override @Override
@ -417,18 +430,18 @@ public class StandardStream extends IdleTimeout implements IStream
notIdle(); notIdle();
if (!canSend()) if (!canSend())
{ {
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame"); throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
} }
if (isLocallyClosed()) if (isLocallyClosed())
{ {
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream"); throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
} }
// Cannot update the close state here, because the data that we send may // Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size. // be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback); session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), new StreamCallback(callback));
} }
@Override @Override
@ -448,18 +461,18 @@ public class StandardStream extends IdleTimeout implements IStream
notIdle(); notIdle();
if (!canSend()) if (!canSend())
{ {
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame"); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
} }
if (isLocallyClosed()) if (isLocallyClosed())
{ {
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream"); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
} }
updateCloseState(headersInfo.isClose(), true); updateCloseState(headersInfo.isClose(), true);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders()); HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback); session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), new StreamCallback(callback));
} }
@Override @Override
@ -527,4 +540,55 @@ public class StandardStream extends IdleTimeout implements IStream
{ {
OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
} }
private class StreamCallback implements Callback
{
private final Callback callback;
private StreamCallback()
{
this(new Adapter());
}
private StreamCallback(Callback callback)
{
this.callback = callback;
}
@Override
public void succeeded()
{
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
close();
callback.failed(x);
}
}
private class StreamPromise implements Promise<Stream>
{
private final Promise<Stream> promise;
public StreamPromise(Promise<Stream> promise)
{
this.promise = promise;
}
@Override
public void succeeded(Stream result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
close();
promise.failed(x);
}
}
} }

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.spdy; package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.HashSet; import java.util.HashSet;
@ -38,6 +28,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -46,6 +38,7 @@ import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo; import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Session;
@ -58,7 +51,6 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.DataFrame; import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.SettingsFrame; import org.eclipse.jetty.spdy.frames.SettingsFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame; import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Fields;
@ -66,9 +58,10 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -78,6 +71,16 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest public class StandardSessionTest
{ {
@ -101,7 +104,7 @@ public class StandardSessionTest
public void setUp() throws Exception public void setUp() throws Exception
{ {
threadPool = Executors.newCachedThreadPool(); threadPool = Executors.newCachedThreadPool();
scheduler = new TimerScheduler(); scheduler = new ScheduledExecutorScheduler();
scheduler.start(); scheduler.start();
session = new StandardSession(VERSION, bufferPool, scheduler, controller, endPoint, null, 1, null, session = new StandardSession(VERSION, bufferPool, scheduler, controller, endPoint, null, 1, null,
generator, new FlowControlStrategy.None()); generator, new FlowControlStrategy.None());
@ -131,8 +134,7 @@ public class StandardSessionTest
callback.succeeded(); callback.succeeded();
return null; return null;
} }
}) }).when(controller).write(any(Callback.class), any(ByteBuffer.class));
.when(controller).write(any(Callback.class), any(ByteBuffer.class));
} }
@Test @Test
@ -429,14 +431,38 @@ public class StandardSessionTest
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testControllerWriteFailsInEndPointFlush() throws InterruptedException public void testControllerWriteFails() throws Exception
{ {
setControllerWriteExpectation(true); final AtomicInteger writes = new AtomicInteger();
final AtomicBoolean fail = new AtomicBoolean();
Controller controller = new Controller()
{
@Override
public void write(Callback callback, ByteBuffer... buffers)
{
writes.incrementAndGet();
if (fail.get())
callback.failed(new ClosedChannelException());
else
callback.succeeded();
}
final CountDownLatch failedCalledLatch = new CountDownLatch(2); @Override
SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); public void close(boolean onlyOutput)
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); {
}
};
ISession session = new StandardSession(VERSION, bufferPool, scheduler, controller, endPoint, null, 1, null, generator, null);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.updateWindowSize(8192); stream.updateWindowSize(8192);
// Send a reply to comply with the API usage
stream.reply(new ReplyInfo(false), new Callback.Adapter());
// Make the controller fail
fail.set(true);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
Callback.Adapter callback = new Callback.Adapter() Callback.Adapter callback = new Callback.Adapter()
{ {
@Override @Override
@ -445,14 +471,11 @@ public class StandardSessionTest
failedCalledLatch.countDown(); failedCalledLatch.countDown();
} }
}; };
// Data frame should fail on controller.write()
// first data frame should fail on controller.write()
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback);
// second data frame should fail without controller.write() as the connection is expected to be broken after first controller.write() call failed.
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback); stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), callback);
verify(controller, times(1)).write(any(Callback.class), any(ByteBuffer.class)); Assert.assertEquals(2, writes.get());
assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); Assert.assertTrue(failedCalledLatch.await(5, TimeUnit.SECONDS));
} }
@Test @Test

View File

@ -18,6 +18,39 @@
package org.eclipse.jetty.spdy; package org.eclipse.jetty.spdy;
import java.nio.channels.ClosedChannelException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -29,33 +62,6 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest public class StandardStreamTest
{ {
@ -71,9 +77,12 @@ public class StandardStreamTest
scheduler.start(); scheduler.start();
} }
/** @After
* Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}. public void tearDown() throws Exception
*/ {
scheduler.stop();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testSyn() public void testSyn()
@ -144,46 +153,106 @@ public class StandardStreamTest
@Test @Test
@Slow @Slow
public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException public void testIdleTimeout() throws Exception
{ {
final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.setIdleTimeout(500); long idleTimeout = 500;
stream.setIdleTimeout(idleTimeout);
final AtomicInteger failureCount = new AtomicInteger();
final CountDownLatch failureLatch = new CountDownLatch(1);
stream.setStreamFrameListener(new StreamFrameListener.Adapter() stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{ {
@Override @Override
public void onFailure(Stream stream, Throwable x) public void onFailure(Stream stream, Throwable x)
{ {
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown(); failureCount.incrementAndGet();
failureLatch.countDown();
} }
}); });
stream.process(new StringDataInfo("string", false)); stream.process(new StringDataInfo("string", false));
Thread.sleep(1000);
assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true)); // Wait more than (2 * idleTimeout) to be sure to trigger a failureCount > 1
Thread.sleep(3 * idleTimeout);
assertThat("onFailure has been called", failureLatch.await(5, TimeUnit.SECONDS), is(true));
Assert.assertEquals(1, failureCount.get());
} }
@Test @Test
@Slow @Slow
public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException, public void testIdleTimeoutIsInterruptedWhenReceiving() throws Exception
TimeoutException
{ {
final CountDownLatch onFailCalledLatch = new CountDownLatch(1); final CountDownLatch failureLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
long idleTimeout = 1000;
stream.setIdleTimeout(idleTimeout);
stream.setStreamFrameListener(new StreamFrameListener.Adapter() stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{ {
@Override @Override
public void onFailure(Stream stream, Throwable x) public void onFailure(Stream stream, Throwable x)
{ {
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown(); failureLatch.countDown();
} }
}); });
stream.process(new SynStreamFrame(SPDY.V3, (byte)0, 1, 0, (byte)0, (short)0, null));
stream.process(new StringDataInfo("string", false)); stream.process(new StringDataInfo("string", false));
Thread.sleep(500); Thread.sleep(idleTimeout / 2);
stream.process(new StringDataInfo("string", false)); stream.process(new StringDataInfo("string", false));
Thread.sleep(500); Thread.sleep(idleTimeout / 2);
assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false)); stream.process(new StringDataInfo("string", false));
Thread.sleep(idleTimeout / 2);
stream.process(new StringDataInfo("string", true));
stream.reply(new ReplyInfo(true), new Callback.Adapter());
Thread.sleep(idleTimeout);
assertThat("onFailure has not been called", failureLatch.await(idleTimeout, TimeUnit.MILLISECONDS), is(false));
} }
@Test
@Slow
public void testReplyFailureClosesStream() throws Exception
{
ISession session = new StandardSession(SPDY.V3, null, null, null, null, null, 1, null, null, null)
{
@Override
public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
{
callback.failed(new ClosedChannelException());
}
};
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
final AtomicInteger failureCount = new AtomicInteger();
stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Stream stream, Throwable x)
{
failureCount.incrementAndGet();
}
});
long idleTimeout = 500;
stream.setIdleTimeout(idleTimeout);
stream.process(new SynStreamFrame(SPDY.V3, (byte)0, 1, 0, (byte)0, (short)0, null));
final CountDownLatch failureLatch = new CountDownLatch(1);
stream.reply(new ReplyInfo(false), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
failureLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Make sure that the idle timeout never fires, since the failure above should have closed the stream
Thread.sleep(3 * idleTimeout);
Assert.assertEquals(0, failureCount.get());
Assert.assertTrue(stream.isClosed());
}
} }