Merge pull request #4539 from eclipse/jetty-10.0.x-4501-SharedBlockingCallback

Issue #4501 - remove all SharedBlockingCallback usage from websocket
This commit is contained in:
Lachlan 2020-02-05 10:07:12 +11:00 committed by GitHub
commit 41a7b8d4e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 47 deletions

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream
private final CoreSession coreSession;
private final ByteBufferPool bufferPool;
private final SharedBlockingCallback blocker;
private long frameCount;
private long bytesSent;
private Frame frame;
@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream
{
this.coreSession = coreSession;
this.bufferPool = bufferPool;
this.blocker = new SharedBlockingCallback();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(buffer);
this.frame = new Frame(OpCode.BINARY);
@ -118,11 +116,9 @@ public class MessageOutputStream extends OutputStream
frame.setFin(fin);
int initialBufferSize = buffer.remaining();
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
}
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

View File

@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -50,7 +50,6 @@ public class MessageWriter extends Writer
.onMalformedInput(CodingErrorAction.REPORT);
private final CoreSession coreSession;
private final SharedBlockingCallback blocker;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
@ -60,7 +59,6 @@ public class MessageWriter extends Writer
public MessageWriter(CoreSession coreSession, int bufferSize)
{
this.coreSession = coreSession;
this.blocker = new SharedBlockingCallback();
this.buffer = CharBuffer.allocate(bufferSize);
this.frame = new Frame(OpCode.TEXT);
}
@ -128,11 +126,9 @@ public class MessageWriter extends Writer
frame.setPayload(payload);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
}
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -33,7 +34,7 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -52,7 +53,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final RawUpgradeRequest upgradeRequest;
private final UnitGenerator generator;
private final FrameCapture frameCapture;
private SharedBlockingCallback sharedBlockingCallback = new SharedBlockingCallback();
public NetworkFuzzer(LocalServer server) throws Exception
{
@ -155,23 +155,16 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
for (Frame f : frames)
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.coreSession.sendFrame(f, blocker, false);
}
FutureCallback callback = new FutureCallback();
frameCapture.coreSession.sendFrame(f, callback, false);
callback.block();
}
}
@Override
public void sendFrames(Frame... frames) throws IOException
{
for (Frame f : frames)
{
try (SharedBlockingCallback.Blocker blocker = sharedBlockingCallback.acquire())
{
frameCapture.coreSession.sendFrame(f, blocker, false);
}
}
sendFrames(Arrays.asList(frames));
}
@Override
@ -227,7 +220,6 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private EndPoint endPoint;
private CountDownLatch openLatch = new CountDownLatch(1);
private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback();
private CoreSession coreSession;
public void setEndPoint(EndPoint endpoint)
@ -275,10 +267,9 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
synchronized (this)
{
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
{
endPoint.write(blocker, buffer);
}
FutureCallback callback = new FutureCallback();
endPoint.write(callback, buffer);
callback.block();
}
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.javax.tests.server;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -38,7 +40,9 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class IdleTimeoutTest
{
@ -76,7 +80,9 @@ public class IdleTimeoutTest
// wait 1 second to allow timeout to fire off
TimeUnit.SECONDS.sleep(1);
session.sendFrames(new Frame(OpCode.TEXT).setPayload("You shouldn't be there"));
IOException error = assertThrows(IOException.class,
() -> session.sendFrames(new Frame(OpCode.TEXT).setPayload("You shouldn't be there")));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
BlockingQueue<Frame> framesQueue = session.getOutputFrames();
Frame frame = framesQueue.poll(1, TimeUnit.SECONDS);

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -41,7 +41,6 @@ public class MessageOutputStream extends OutputStream
private final CoreSession coreSession;
private final ByteBufferPool bufferPool;
private final SharedBlockingCallback blocker;
private long frameCount;
private long bytesSent;
private Frame frame;
@ -53,7 +52,6 @@ public class MessageOutputStream extends OutputStream
{
this.coreSession = coreSession;
this.bufferPool = bufferPool;
this.blocker = new SharedBlockingCallback();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(buffer);
this.frame = new Frame(OpCode.BINARY);
@ -116,8 +114,10 @@ public class MessageOutputStream extends OutputStream
BufferUtil.flipToFlush(buffer, 0);
frame.setPayload(buffer);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
try
{
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
assert buffer.remaining() == 0;

View File

@ -27,7 +27,7 @@ import java.nio.charset.CodingErrorAction;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -50,7 +50,6 @@ public class MessageWriter extends Writer
.onMalformedInput(CodingErrorAction.REPORT);
private final CoreSession coreSession;
private final SharedBlockingCallback blocker;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
@ -60,7 +59,6 @@ public class MessageWriter extends Writer
public MessageWriter(CoreSession coreSession, int bufferSize)
{
this.coreSession = coreSession;
this.blocker = new SharedBlockingCallback();
this.buffer = CharBuffer.allocate(bufferSize);
this.frame = new Frame(OpCode.TEXT);
}
@ -128,11 +126,9 @@ public class MessageWriter extends Writer
frame.setPayload(payload);
frame.setFin(fin);
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();
}
++frameCount;
// Any flush after the first will be a CONTINUATION frame.