Fixes #2548 - Possible deadlock failing HTTP/2 stream creation.

Now failing the callback outside of the synchronized block.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-05-16 22:18:47 +02:00
parent 01f7aecc4e
commit 16b7359ae5
1 changed files with 49 additions and 43 deletions

View File

@ -472,31 +472,36 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener) public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{ {
// Synchronization is necessary to atomically create try
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{ {
int streamId = frame.getStreamId(); // Synchronization is necessary to atomically create
if (streamId <= 0) // the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{ {
streamId = streamIds.getAndAdd(2); int streamId = frame.getStreamId();
PriorityFrame priority = frame.getPriority(); if (streamId <= 0)
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), {
priority.getWeight(), priority.isExclusive()); streamId = streamIds.getAndAdd(2);
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); PriorityFrame priority = frame.getPriority();
} priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
final IStream stream = createLocalStream(streamId, promise); priority.getWeight(), priority.isExclusive());
if (stream == null) frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
return; }
stream.setListener(listener); IStream stream = createLocalStream(streamId);
stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream)); ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
queued = flusher.append(entry); queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
} }
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
} }
@Override @Override
@ -517,25 +522,30 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener) public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{ {
// Synchronization is necessary to atomically create try
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{ {
int streamId = streamIds.getAndAdd(2); // Synchronization is necessary to atomically create
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); // the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
final IStream pushStream = createLocalStream(streamId, promise); IStream pushStream = createLocalStream(streamId);
if (pushStream == null) pushStream.setListener(listener);
return;
pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream)); ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
queued = flusher.append(entry); queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
}
catch (Throwable x)
{
promise.failed(x);
} }
// Iterate outside the synchronized block.
if (queued)
flusher.iterate();
} }
@Override @Override
@ -676,17 +686,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
protected IStream createLocalStream(int streamId, Promise<Stream> promise) protected IStream createLocalStream(int streamId)
{ {
while (true) while (true)
{ {
int localCount = localStreamCount.get(); int localCount = localStreamCount.get();
int maxCount = getMaxLocalStreams(); int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount) if (maxCount >= 0 && localCount >= maxCount)
{ throw new IllegalStateException("Max local stream count " + maxCount + " exceeded");
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1)) if (localStreamCount.compareAndSet(localCount, localCount + 1))
break; break;
} }
@ -702,8 +709,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
else else
{ {
promise.failed(new IllegalStateException("Duplicate stream " + streamId)); throw new IllegalStateException("Duplicate stream " + streamId);
return null;
} }
} }