Merge pull request #3347 from shawnsmith/jetty-9.4.x-websocket-suspend-resume

Fix race condition between onFillable() and resume()
This commit is contained in:
Simone Bordet 2019-02-12 15:08:43 +01:00 committed by GitHub
commit a16d98ffd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 204 additions and 18 deletions

View File

@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
@ -125,12 +124,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final Generator generator;
private final Parser parser;
private final WebSocketPolicy policy;
private final AtomicBoolean suspendToken;
private final Suspender suspender;
private final FrameFlusher flusher;
private final String id;
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer prefillBuffer;
private ReadMode readMode = ReadMode.PARSE;
private IOState ioState;
@ -147,7 +145,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.parser = new Parser(policy,bufferPool);
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.suspendToken = new AtomicBoolean(false);
this.suspender = new Suspender();
this.ioState = new IOState();
this.ioState.addListener(this);
this.flusher = new Flusher(bufferPool,generator,endp);
@ -302,7 +300,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public boolean isReading()
{
return isFilling;
return !suspender.isSuspended();
}
/**
@ -384,8 +382,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
try
{
isFilling = true;
if(readMode == ReadMode.PARSE)
{
readMode = readParse(buffer);
@ -400,14 +396,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
bufferPool.release(buffer);
}
if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
if (readMode == ReadMode.EOF)
{
suspender.eof();
}
else if (!suspender.activateRequestedSuspend())
{
fillInterested();
}
else
{
isFilling = false;
}
}
@Override
@ -586,12 +582,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void resume()
{
if (suspendToken.getAndSet(false))
if (suspender.requestResume())
{
if (!isReading())
{
fillInterested();
}
fillInterested();
}
}
@ -627,7 +620,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public SuspendToken suspend()
{
suspendToken.set(true);
suspender.requestSuspend();
return this;
}

View File

@ -0,0 +1,102 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.atomic.AtomicReference;
class Suspender
{
private enum State
{
/** Not suspended. */
NORMAL,
/** Suspend requested but not yet taken effect. */
PENDING,
/** Suspended but resumable. */
SUSPENDED,
/** Permanently suspended (terminal state). */
EOF,
}
private final AtomicReference<State> ref = new AtomicReference<>(State.NORMAL);
boolean isSuspended()
{
State state = ref.get();
return state == State.SUSPENDED || state == State.EOF;
}
/**
* Requests that activity be suspended the next time {@link #activateRequestedSuspend()} is called.
*/
void requestSuspend()
{
// Transition NORMAL -> PENDING
State state;
do
{
state = ref.get();
}
while (state == State.NORMAL && !ref.compareAndSet(state, State.PENDING));
}
/**
* Returns true if activity is suspended, whether or not it was already suspended.
*/
boolean activateRequestedSuspend()
{
// Transition PENDING -> SUSPENDED
State state;
do
{
state = ref.get();
}
while (state == State.PENDING && !ref.compareAndSet(state, State.SUSPENDED));
return state != State.NORMAL;
}
/**
* Returns true if activity was suspended and should now resume.
*/
boolean requestResume()
{
// Transition PENDING|SUSPENDED -> NORMAL
State state;
do
{
state = ref.get();
}
while ((state == State.PENDING || state == State.SUSPENDED) && !ref.compareAndSet(state, State.NORMAL));
return state == State.SUSPENDED;
}
void eof()
{
ref.set(State.EOF);
}
@Override
public String toString()
{
return ref.get().toString();
}
}

View File

@ -0,0 +1,91 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.jupiter.api.Test;
public class SuspenderTest {
@Test
public void testNotSuspended()
{
Suspender suspender = new Suspender();
assertThat("Initially normal", suspender.isSuspended(), is(false));
assertThat("No pending suspend to activate", suspender.activateRequestedSuspend(), is(false));
assertThat("No pending suspend to activate", suspender.isSuspended(), is(false));
assertThat("No pending suspend to resume", suspender.requestResume(), is(false));
assertThat("No pending suspend to resume", suspender.isSuspended(), is(false));
}
@Test
public void testSuspendThenResume()
{
Suspender suspender = new Suspender();
assertThat("Initially suspended", suspender.isSuspended(), is(false));
suspender.requestSuspend();
assertThat("Requesting suspend doesn't take effect immediately", suspender.isSuspended(), is(false));
assertThat("Resume from pending requires no followup", suspender.requestResume(), is(false));
assertThat("Resume from pending requires no followup", suspender.isSuspended(), is(false));
assertThat("Requested suspend was discarded", suspender.activateRequestedSuspend(), is(false));
assertThat("Requested suspend was discarded", suspender.isSuspended(), is(false));
}
@Test
public void testSuspendThenActivateThenResume()
{
Suspender suspender = new Suspender();
assertThat("Initially suspended", suspender.isSuspended(), is(false));
suspender.requestSuspend();
assertThat("Requesting suspend doesn't take effect immediately", suspender.isSuspended(), is(false));
assertThat("Suspend activated", suspender.activateRequestedSuspend(), is(true));
assertThat("Suspend activated", suspender.isSuspended(), is(true));
assertThat("Resumed", suspender.requestResume(), is(true));
assertThat("Resumed", suspender.isSuspended(), is(false));
}
@Test
public void testEof()
{
Suspender suspender = new Suspender();
suspender.eof();
assertThat(suspender.isSuspended(), is(true));
assertThat(suspender.activateRequestedSuspend(), is(true));
suspender.requestSuspend();
assertThat(suspender.isSuspended(), is(true));
assertThat(suspender.activateRequestedSuspend(), is(true));
assertThat(suspender.isSuspended(), is(true));
assertThat(suspender.requestResume(), is(false));
assertThat(suspender.isSuspended(), is(true));
}
}