Issue #4501 - remove all SharedBlockingCallback usage from websocket

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-02-03 17:20:45 +11:00
parent 1b10e2330e
commit 182daf8504
5 changed files with 25 additions and 46 deletions

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream
private final CoreSession coreSession;
private final ByteBufferPool bufferPool;
private final SharedBlockingCallback blocker;
private long frameCount;
private long bytesSent;
private Frame frame;
@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream
{
this.coreSession = coreSession;
this.bufferPool = bufferPool;
this.blocker = new SharedBlockingCallback();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(buffer);
this.frame = new Frame(OpCode.BINARY);
@ -118,11 +116,9 @@ public class MessageOutputStream extends OutputStream
frame.setFin(fin);
int initialBufferSize = buffer.remaining();
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

View File

@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -50,7 +50,6 @@ public class MessageWriter extends Writer
.onMalformedInput(CodingErrorAction.REPORT);
private final CoreSession coreSession;
private final SharedBlockingCallback blocker;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
@ -60,7 +59,6 @@ public class MessageWriter extends Writer
public MessageWriter(CoreSession coreSession, int bufferSize)
{
this.coreSession = coreSession;
this.blocker = new SharedBlockingCallback();
this.buffer = CharBuffer.allocate(bufferSize);
this.frame = new Frame(OpCode.TEXT);
}
@ -128,11 +126,9 @@ public class MessageWriter extends Writer
frame.setPayload(payload);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -33,7 +34,7 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -52,7 +53,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final RawUpgradeRequest upgradeRequest;
private final UnitGenerator generator;
private final FrameCapture frameCapture;
private SharedBlockingCallback sharedBlockingCallback = new SharedBlockingCallback();
public NetworkFuzzer(LocalServer server) throws Exception
{
@ -155,23 +155,16 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
for (Frame f : frames)
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.coreSession.sendFrame(f, blocker, false);
}
FutureCallback callback = new FutureCallback();
frameCapture.coreSession.sendFrame(f, callback, false);
callback.block();
}
}
@Override
public void sendFrames(Frame... frames) throws IOException
{
for (Frame f : frames)
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.coreSession.sendFrame(f, blocker, false);
}
}
sendFrames(Arrays.asList(frames));
}
@Override
@ -227,7 +220,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private EndPoint endPoint;
private CountDownLatch openLatch = new CountDownLatch(1);
private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback();
private CoreSession coreSession;
public void setEndPoint(EndPoint endpoint)
@ -275,10 +267,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
synchronized (this)
{
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
{
endPoint.write(blocker, buffer);
}
FutureCallback callback = new FutureCallback();
endPoint.write(callback, buffer);
callback.block();
}
}
}

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream
private final CoreSession coreSession;
private final ByteBufferPool bufferPool;
private final SharedBlockingCallback blocker;
private long frameCount;
private long bytesSent;
private Frame frame;
@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream
{
this.coreSession = coreSession;
this.bufferPool = bufferPool;
this.blocker = new SharedBlockingCallback();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(buffer);
this.frame = new Frame(OpCode.BINARY);
@ -116,8 +114,10 @@ public class MessageOutputStream extends OutputStream
BufferUtil.flipToFlush(buffer, 0);
frame.setPayload(buffer);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
try
{
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
assert buffer.remaining() == 0;

View File

@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -50,7 +50,6 @@ public class MessageWriter extends Writer
.onMalformedInput(CodingErrorAction.REPORT);
private final CoreSession coreSession;
private final SharedBlockingCallback blocker;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
@ -60,7 +59,6 @@ public class MessageWriter extends Writer
public MessageWriter(CoreSession coreSession, int bufferSize)
{
this.coreSession = coreSession;
this.blocker = new SharedBlockingCallback();
this.buffer = CharBuffer.allocate(bufferSize);
this.frame = new Frame(OpCode.TEXT);
}
@ -128,11 +126,9 @@ public class MessageWriter extends Writer
frame.setPayload(payload);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
++frameCount;
// Any flush after the first will be a CONTINUATION frame.