Merge branch 'jetty-9.4.x' of github.com:eclipse/jetty.project into jetty-9.4.x
This commit is contained in:
commit
39ee316a7f
|
@ -78,7 +78,7 @@ public class HttpGenerator
|
|||
FLUSH, // The buffers previously generated should be flushed
|
||||
CONTINUE, // Continue generating the message
|
||||
SHUTDOWN_OUT, // Need EOF to be signaled
|
||||
DONE // Message generation complete
|
||||
DONE // The current phase of generation is complete
|
||||
}
|
||||
|
||||
// other statics
|
||||
|
|
|
@ -732,9 +732,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
{
|
||||
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} generate: {} ({},{},{})@{}",
|
||||
this,
|
||||
LOG.debug("generate: {} for {} ({},{},{})@{}",
|
||||
result,
|
||||
this,
|
||||
BufferUtil.toSummaryString(_header),
|
||||
BufferUtil.toSummaryString(_content),
|
||||
_lastContent,
|
||||
|
@ -826,8 +826,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
}
|
||||
case DONE:
|
||||
{
|
||||
// If shutdown after commit, we can still close here.
|
||||
if (getConnector().isShutdown())
|
||||
// If this is the end of the response and the connector was shutdown after response was committed,
|
||||
// we can't add the Connection:close header, but we are still allowed to close the connection
|
||||
// by shutting down the output.
|
||||
if (getConnector().isShutdown() && _generator.isEnd() && _generator.isPersistent())
|
||||
_shutdownOut = true;
|
||||
|
||||
return Action.SUCCEEDED;
|
||||
|
|
|
@ -591,17 +591,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
// handle blocking write
|
||||
|
||||
// Should we aggregate?
|
||||
int capacity = getBufferSize();
|
||||
// Yes - if the write is smaller than the commitSize (==aggregate buffer size)
|
||||
// and the write is not the last one, or is last but will fit in an already allocated aggregate buffer.
|
||||
boolean last = isLastContentToWrite(len);
|
||||
if (!last && len <= _commitSize)
|
||||
if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate)))
|
||||
{
|
||||
acquireBuffer();
|
||||
|
||||
// YES - fill the aggregate with content from the buffer
|
||||
int filled = BufferUtil.fill(_aggregate, b, off, len);
|
||||
|
||||
// return if we are not complete, not full and filled all the content
|
||||
if (filled == len && !BufferUtil.isFull(_aggregate))
|
||||
// return if we are not the last write and have aggregated all of the content
|
||||
if (!last && filled == len && !BufferUtil.isFull(_aggregate))
|
||||
return;
|
||||
|
||||
// adjust offset/length
|
||||
|
|
|
@ -56,11 +56,13 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
|
|||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -166,6 +168,66 @@ public class GracefulStopTest
|
|||
client.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test completed writes during shutdown do not close output
|
||||
* @throws Exception on test failure
|
||||
*/
|
||||
@Test
|
||||
public void testWriteDuringShutdown() throws Exception
|
||||
{
|
||||
Server server = new Server();
|
||||
server.setStopTimeout(1000);
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ABHandler handler = new ABHandler();
|
||||
StatisticsHandler stats = new StatisticsHandler();
|
||||
server.setHandler(stats);
|
||||
stats.setHandler(handler);
|
||||
|
||||
server.start();
|
||||
|
||||
Thread stopper = new Thread(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
handler.latchA.await();
|
||||
server.stop();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
stopper.start();
|
||||
|
||||
final int port = connector.getLocalPort();
|
||||
try(Socket client = new Socket("127.0.0.1", port))
|
||||
{
|
||||
client.getOutputStream().write((
|
||||
"GET / HTTP/1.1\r\n" +
|
||||
"Host: localhost:" + port + "\r\n" +
|
||||
"\r\n"
|
||||
).getBytes());
|
||||
client.getOutputStream().flush();
|
||||
|
||||
while (!connector.isShutdown())
|
||||
Thread.sleep(10);
|
||||
|
||||
handler.latchB.countDown();
|
||||
|
||||
String response = IO.toString(client.getInputStream());
|
||||
assertThat(response, startsWith("HTTP/1.1 200 "));
|
||||
assertThat(response, containsString("Content-Length: 2"));
|
||||
assertThat(response, containsString("Connection: close"));
|
||||
assertThat(response, endsWith("ab"));
|
||||
}
|
||||
stopper.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test of standard graceful timeout mechanism when a block request does
|
||||
* complete. Note that even though the request completes after 100ms, the
|
||||
|
@ -736,6 +798,30 @@ public class GracefulStopTest
|
|||
}
|
||||
}
|
||||
|
||||
static class ABHandler extends AbstractHandler
|
||||
{
|
||||
final CountDownLatch latchA = new CountDownLatch(1);
|
||||
final CountDownLatch latchB = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.setContentLength(2);
|
||||
response.getOutputStream().write("a".getBytes());
|
||||
try
|
||||
{
|
||||
latchA.countDown();
|
||||
latchB.await();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
response.flushBuffer();
|
||||
response.getOutputStream().write("b".getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
static class TestHandler extends AbstractHandler
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
|
|
@ -21,9 +21,25 @@ package org.eclipse.jetty.util.component;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
|
||||
/* A Lifecycle that can be gracefully shutdown.
|
||||
/**
|
||||
* <p>Jetty components that wish to be part of a Graceful shutdown implement this interface so that
|
||||
* the {@link Graceful#shutdown()} method will be called to initiate a shutdown. Shutdown operations
|
||||
* can fall into the following categories:</p>
|
||||
* <ul>
|
||||
* <li>Preventing new load from being accepted (eg connectors stop accepting connections)</li>
|
||||
* <li>Preventing existing load expanding (eg stopping existing connections accepting new requests)</li>
|
||||
* <li>Waiting for existing load to complete (eg waiting for active request count to reduce to 0)</li>
|
||||
* <li>Performing cleanup operations that may take time (eg closing an SSL connection)</li>
|
||||
* </ul>
|
||||
* <p>The {@link Future} returned by the the shutdown call will be completed to indicate the shutdown operation is completed.
|
||||
* Some shutdown operations may be instantaneous and always return a completed future.
|
||||
* </p><p>
|
||||
* Graceful shutdown is typically orchestrated by the doStop methods of Server or ContextHandler (for a full or partial
|
||||
* shutdown respectively).
|
||||
* </p>
|
||||
*/
|
||||
public interface Graceful
|
||||
{
|
||||
|
@ -31,6 +47,12 @@ public interface Graceful
|
|||
|
||||
boolean isShutdown();
|
||||
|
||||
/**
|
||||
* A utility Graceful that uses a {@link FutureCallback} to indicate if shutdown is completed.
|
||||
* By default the {@link FutureCallback} is returned as already completed, but the {@link #newShutdownCallback()} method
|
||||
* can be overloaded to return a non-completed callback that will require a {@link Callback#succeeded()} or
|
||||
* {@link Callback#failed(Throwable)} call to be completed.
|
||||
*/
|
||||
class Shutdown implements Graceful
|
||||
{
|
||||
private final AtomicReference<FutureCallback> _shutdown = new AtomicReference<>();
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.junit.jupiter.api.Test;
|
|||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class SuspendResumeTest
|
||||
|
@ -195,8 +196,7 @@ public class SuspendResumeTest
|
|||
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
|
||||
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
|
||||
|
||||
// suspend the client so that no read events occur
|
||||
SuspendToken suspendToken = clientSocket.session.suspend();
|
||||
suspendToken.resume();
|
||||
// suspend after closed throws ISE
|
||||
assertThrows(IllegalStateException.class, () -> clientSocket.session.suspend());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private final EventDriver websocket;
|
||||
private final Executor executor;
|
||||
private final WebSocketPolicy policy;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final AtomicBoolean onCloseCalled = new AtomicBoolean(false);
|
||||
private ClassLoader classLoader;
|
||||
private ExtensionFactory extensionFactory;
|
||||
private RemoteEndpointFactory remoteEndpointFactory;
|
||||
|
@ -80,7 +80,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private UpgradeRequest upgradeRequest;
|
||||
private UpgradeResponse upgradeResponse;
|
||||
private CompletableFuture<Session> openFuture;
|
||||
private AtomicBoolean onCloseCalled = new AtomicBoolean(false);
|
||||
|
||||
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection)
|
||||
{
|
||||
|
@ -338,10 +337,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
public boolean isOpen()
|
||||
{
|
||||
if (this.connection == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return !closed.get() && this.connection.isOpen();
|
||||
|
||||
return !onCloseCalled.get() && this.connection.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -546,6 +544,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
@Override
|
||||
public SuspendToken suspend()
|
||||
{
|
||||
if (onCloseCalled.get())
|
||||
throw new IllegalStateException("Not open");
|
||||
|
||||
return connection.suspend();
|
||||
}
|
||||
|
||||
|
|
|
@ -87,10 +87,8 @@ class ReadState
|
|||
|
||||
/**
|
||||
* Requests that reads from the connection be suspended.
|
||||
*
|
||||
* @return whether the suspending was successful
|
||||
*/
|
||||
boolean suspending()
|
||||
void suspending()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
|
@ -101,9 +99,7 @@ class ReadState
|
|||
{
|
||||
case READING:
|
||||
state = State.SUSPENDING;
|
||||
return true;
|
||||
case EOF:
|
||||
return false;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException(toString(state));
|
||||
}
|
||||
|
@ -131,8 +127,6 @@ class ReadState
|
|||
ByteBuffer bb = buffer;
|
||||
buffer = null;
|
||||
return bb;
|
||||
case EOF:
|
||||
return null;
|
||||
default:
|
||||
throw new IllegalStateException(toString(state));
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ReadStateTest
|
||||
{
|
||||
|
@ -50,7 +49,7 @@ public class ReadStateTest
|
|||
ReadState readState = new ReadState();
|
||||
assertThat("Initially reading", readState.isReading(), is(true));
|
||||
|
||||
assertTrue(readState.suspending());
|
||||
readState.suspending();
|
||||
assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false));
|
||||
|
||||
assertNull(readState.resume());
|
||||
|
@ -64,7 +63,7 @@ public class ReadStateTest
|
|||
ReadState readState = new ReadState();
|
||||
assertThat("Initially reading", readState.isReading(), is(true));
|
||||
|
||||
assertThat(readState.suspending(), is(true));
|
||||
readState.suspending();
|
||||
assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false));
|
||||
|
||||
ByteBuffer content = BufferUtil.toBuffer("content");
|
||||
|
@ -84,8 +83,8 @@ public class ReadStateTest
|
|||
|
||||
assertThat(readState.isReading(), is(false));
|
||||
assertThat(readState.isSuspended(), is(true));
|
||||
assertThat(readState.suspending(), is(false));
|
||||
assertThrows(IllegalStateException.class, readState::suspending);
|
||||
assertThat(readState.getAction(content), is(ReadState.Action.EOF));
|
||||
assertNull(readState.resume());
|
||||
assertThrows(IllegalStateException.class, readState::resume);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue