Issue #3246 Avoid concurrent raw write on fuzzer client

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-10 18:57:38 +11:00 committed by Lachlan Roberts
parent 40f1d5866f
commit c84f97e983
2 changed files with 27 additions and 17 deletions

View File

@ -55,6 +55,8 @@ abstract public class WriteFlusher
private final EndPoint _endPoint; private final EndPoint _endPoint;
private final AtomicReference<State> _state = new AtomicReference<>(); private final AtomicReference<State> _state = new AtomicReference<>();
Throwable last;
static static
{ {
// fill the state machine // fill the state machine
@ -270,7 +272,13 @@ abstract public class WriteFlusher
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
if (!updateState(__IDLE, __WRITING)) if (!updateState(__IDLE, __WRITING))
{
if (last!=null)
last.printStackTrace();
throw new WritePendingException(); throw new WritePendingException();
}
last = new Throwable();
try try
{ {

View File

@ -45,7 +45,7 @@ import org.eclipse.jetty.websocket.core.internal.Generator;
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{ {
private final LocalServer server; private final LocalServer server;
private final RawWebSocketClient rawClient; private final WebSocketCoreClient client;
private final RawUpgradeRequest upgradeRequest; private final RawUpgradeRequest upgradeRequest;
private final UnitGenerator generator; private final UnitGenerator generator;
private final FrameCapture frameCapture; private final FrameCapture frameCapture;
@ -65,9 +65,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{ {
super(); super();
this.server = server; this.server = server;
this.rawClient = new RawWebSocketClient(); this.client = new WebSocketCoreClient();
CompletableFuture<FrameCapture> futureOnCapture = new CompletableFuture<>(); CompletableFuture<FrameCapture> futureOnCapture = new CompletableFuture<>();
this.upgradeRequest = new RawUpgradeRequest(rawClient, wsURI, futureOnCapture); this.upgradeRequest = new RawUpgradeRequest(client, wsURI, futureOnCapture);
if (requestHeaders != null) if (requestHeaders != null)
{ {
HttpFields fields = this.upgradeRequest.getHeaders(); HttpFields fields = this.upgradeRequest.getHeaders();
@ -77,10 +77,10 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
fields.put(name, value); fields.put(name, value);
}); });
} }
this.rawClient.start(); this.client.start();
this.generator = new UnitGenerator(Behavior.CLIENT); this.generator = new UnitGenerator(Behavior.CLIENT);
CompletableFuture<FrameHandler.CoreSession> futureHandler = this.rawClient.connect(upgradeRequest); CompletableFuture<FrameHandler.CoreSession> futureHandler = this.client.connect(upgradeRequest);
CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(futureOnCapture, (channel, capture) -> capture); CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(futureOnCapture, (channel, capture) -> capture);
this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS); this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS);
} }
@ -102,7 +102,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
@Override @Override
public void close() throws Exception public void close() throws Exception
{ {
this.rawClient.stop(); this.client.stop();
} }
@Override @Override
@ -156,7 +156,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{ {
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire()) try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{ {
frameCapture.channel.sendFrame(f, blocker, false); frameCapture.session.sendFrame(f, blocker, false);
} }
} }
} }
@ -168,7 +168,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{ {
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire()) try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{ {
frameCapture.channel.sendFrame(f, blocker, false); frameCapture.session.sendFrame(f, blocker, false);
} }
} }
} }
@ -184,10 +184,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
} }
} }
public static class RawWebSocketClient extends WebSocketCoreClient
{
}
public static class RawUpgradeRequest extends UpgradeRequest public static class RawUpgradeRequest extends UpgradeRequest
{ {
private final CompletableFuture<FrameCapture> futureCapture; private final CompletableFuture<FrameCapture> futureCapture;
@ -226,7 +222,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>(); private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint; private final EndPoint endPoint;
private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback(); private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback();
private CoreSession channel; private CoreSession session;
public FrameCapture(EndPoint endPoint) public FrameCapture(EndPoint endPoint)
{ {
@ -247,20 +243,26 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
receivedFrames.offer(Frame.copy(frame)); receivedFrames.offer(Frame.copy(frame));
callback.succeeded(); synchronized(this)
{
callback.succeeded();
}
} }
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession) throws Exception
{ {
this.channel = coreSession; this.session = coreSession;
} }
public void writeRaw(ByteBuffer buffer) throws IOException public void writeRaw(ByteBuffer buffer) throws IOException
{ {
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire()) synchronized (this)
{ {
this.endPoint.write(blocker, buffer); try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
{
this.endPoint.write(blocker, buffer);
}
} }
} }
} }