Issue #372 (Data race in HttpReceiverOverHTTP2)
Fixed implementations of onData() to properly complete the callback and to copy the data if they use it asynchronously.
This commit is contained in:
parent
aa6de825b7
commit
b8fcc5112f
|
@ -357,6 +357,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
dataLatch.countDown();
|
dataLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
warmupLatch.countDown();
|
warmupLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -188,6 +189,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
warmupLatch.countDown();
|
warmupLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -273,6 +275,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
warmupLatch.countDown();
|
warmupLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -298,6 +301,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
pushLatch.countDown();
|
pushLatch.countDown();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -325,6 +329,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
if (frame.isEndStream())
|
if (frame.isEndStream())
|
||||||
secondaryResponseLatch.countDown();
|
secondaryResponseLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -372,6 +377,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
warmupLatch.countDown();
|
warmupLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -655,6 +661,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
if (frame.isEndStream())
|
if (frame.isEndStream())
|
||||||
warmupLatch.countDown();
|
warmupLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -676,6 +683,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
if (frame.isEndStream())
|
if (frame.isEndStream())
|
||||||
primaryResponseLatch.countDown();
|
primaryResponseLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,12 @@ public class StreamCloseTest extends AbstractTest
|
||||||
public void onData(final Stream stream, DataFrame frame, final Callback callback)
|
public void onData(final Stream stream, DataFrame frame, final Callback callback)
|
||||||
{
|
{
|
||||||
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
|
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
|
||||||
stream.data(frame, new Callback()
|
|
||||||
|
// We must copy the data that we send asynchronously.
|
||||||
|
ByteBuffer data = frame.getData();
|
||||||
|
ByteBuffer copy = ByteBuffer.allocate(data.remaining());
|
||||||
|
copy.put(data).flip();
|
||||||
|
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -155,6 +160,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
// The sent data callback may not be notified yet here.
|
// The sent data callback may not be notified yet here.
|
||||||
|
callback.succeeded();
|
||||||
completeLatch.countDown();
|
completeLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -70,6 +70,10 @@ public class StreamCountTest extends AbstractTest
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
||||||
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -144,6 +148,10 @@ public class StreamCountTest extends AbstractTest
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
||||||
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,6 +171,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
stream1DataLatch.countDown();
|
stream1DataLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -186,6 +187,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
|
callback.succeeded();
|
||||||
stream2DataLatch.countDown();
|
stream2DataLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue