Merged branch 'jetty-11.0.x' into 'jetty-12.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-06-13 15:13:57 +02:00
commit 77825755cc
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
7 changed files with 99 additions and 50 deletions

View File

@ -0,0 +1,49 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.util;
/**
* This exception can safely be stored in a static variable as suppressed exceptions are disabled,
* meaning calling {@link #addSuppressed(Throwable)} has no effect.
* This prevents potential memory leaks where a statically-stored exception would accumulate
* suppressed exceptions added to them.
*/
public class StaticException extends Exception
{
/**
* Create an instance with writable stack trace and suppression disabled.
*
* @param message the detail message
*
* @see Throwable#Throwable(String, Throwable, boolean, boolean)
*/
public StaticException(String message)
{
this(message, false);
}
/**
* Create an instance with suppression disabled.
*
* @param message the detail message
* @param writableStackTrace whether or not the stack trace should be writable
*
* @see Throwable#Throwable(String, Throwable, boolean, boolean)
*/
public StaticException(String message, boolean writableStackTrace)
{
// Make sure to call the super constructor that disables suppressed exception.
super(message, null, false, writableStackTrace);
}
}

View File

@ -20,6 +20,7 @@ import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
@ -38,6 +39,8 @@ import org.eclipse.jetty.websocket.core.IncomingFrames;
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
@ -101,6 +104,18 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
succeeded();
}
/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
{
failed(SENTINEL_CLOSE_EXCEPTION);
iterate();
}
}
/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
@ -44,6 +45,7 @@ public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new StaticException("Closed");
private final AutoLock lock = new AutoLock();
private final LongAdder messagesOut = new LongAdder();
@ -184,15 +186,7 @@ public class FrameFlusher extends IteratingCallback
{
try (AutoLock l = lock.lock())
{
// TODO: find a way to not create exception if cause is null.
closedCause = cause == null ? new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
} : cause;
closedCause = cause == null ? CLOSED_CHANNEL : cause;
}
iterate();
}

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -150,17 +149,8 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
@Override
public void close()
{
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
ClosedChannelException exception = new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
incomingFlusher.failFlusher(exception);
outgoingFlusher.failFlusher(exception);
incomingFlusher.closeFlusher();
outgoingFlusher.closeFlusher();
releaseInflater();
releaseDeflater();
}

View File

@ -18,6 +18,7 @@ import java.util.Queue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.slf4j.Logger;
@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
public abstract class TransformingFlusher
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
private final AutoLock lock = new AutoLock();
private final Queue<FrameEntry> entries = new ArrayDeque<>();
@ -77,13 +79,20 @@ public abstract class TransformingFlusher
notifyCallbackFailure(callback, failure);
}
/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
failFlusher(SENTINEL_CLOSE_EXCEPTION);
}
/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
// TODO: find a way to close the flusher in non error case without exception.
boolean failed = false;
try (AutoLock l = lock.lock())
{

View File

@ -21,6 +21,7 @@ import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
@ -33,15 +34,7 @@ import org.slf4j.LoggerFactory;
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private static final Content.Chunk.Error RECYCLED_ERROR_CHUNK = Content.Chunk.from(new IllegalStateException("ContentProducer has been recycled"));
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new IOException("Unconsumed content")
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
private static final Content.Chunk.Error RECYCLED_ERROR_CHUNK = Content.Chunk.from(new StaticException("ContentProducer has been recycled"));
private final AutoLock _lock = new AutoLock();
private final ServletChannel _servletChannel;
@ -169,7 +162,7 @@ class AsyncContentProducer implements ContentProducer
LOG.debug("checkMinDataRate aborting channel {}", this);
_servletChannel.abort(bad);
}
consumeCurrentChunk(bad);
consumeCurrentChunk();
throw bad;
}
}
@ -189,27 +182,27 @@ class AsyncContentProducer implements ContentProducer
public boolean consumeAvailable()
{
assertLocked();
Throwable x = UNCONSUMED_CONTENT_EXCEPTION;
if (LOG.isTraceEnabled())
{
x = new IOException("Unconsumed content");
LOG.trace("consumeAvailable {}", this, x);
}
consumeCurrentChunk(x);
boolean atEof = consumeAvailableChunks();
boolean atEof = consumeCurrentChunk();
if (LOG.isDebugEnabled())
LOG.debug("consumed current chunk of ServletChannel EOF={} {}", atEof, this);
if (atEof)
return true;
atEof = consumeAvailableChunks();
if (LOG.isDebugEnabled())
LOG.debug("consumed available chunks of ServletChannel EOF={} {}", atEof, this);
return atEof;
}
private void consumeCurrentChunk(Throwable x)
private boolean consumeCurrentChunk()
{
if (_transformedChunk != null && !_transformedChunk.isTerminal())
{
if (_transformedChunk != _rawChunk)
{
if (LOG.isDebugEnabled())
LOG.debug("failing currently held transformed chunk {} {}", x, this);
LOG.debug("releasing current transformed chunk {}", this);
_transformedChunk.skip(_transformedChunk.remaining());
_transformedChunk.release();
}
@ -219,15 +212,13 @@ class AsyncContentProducer implements ContentProducer
if (_rawChunk != null && !_rawChunk.isTerminal())
{
if (LOG.isDebugEnabled())
LOG.debug("failing currently held raw chunk {} {}", x, this);
LOG.debug("releasing current raw chunk {}", this);
_rawChunk.skip(_rawChunk.remaining());
_rawChunk.release();
_rawChunk = null;
_rawChunk = _rawChunk.isLast() ? Content.Chunk.EOF : null;
}
Content.Chunk.Error errorChunk = Content.Chunk.from(x);
_transformedChunk = errorChunk;
_rawChunk = errorChunk;
return _rawChunk != null && _rawChunk.isLast();
}
private boolean consumeAvailableChunks()
@ -451,7 +442,7 @@ class AsyncContentProducer implements ContentProducer
IOException failure = new IOException("Interceptor " + _interceptor + " did not consume any of the " + _rawChunk.remaining() + " remaining byte(s) of chunk");
if (chunk != null)
chunk.release();
consumeCurrentChunk(failure);
consumeCurrentChunk();
// Set the _error flag to mark the chunk as definitive, i.e.:
// do not try to produce new raw chunk to get a fresher error
// when the terminal chunk was caused by the interceptor not
@ -472,7 +463,7 @@ class AsyncContentProducer implements ContentProducer
catch (Throwable x)
{
IOException failure = new IOException("bad chunk", x);
consumeCurrentChunk(failure);
consumeCurrentChunk();
// Set the _error flag to mark the chunk as definitive, i.e.:
// do not try to produce new raw chunk to get a fresher error
// when the terminal chunk was caused by the interceptor throwing.

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.Trailers;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private static final Content.Chunk.Error RECYCLED_ERROR_CONTENT = Content.Chunk.from(new IllegalStateException("ContentProducer has been recycled"));
private static final Content.Chunk.Error RECYCLED_ERROR_CONTENT = Content.Chunk.from(new StaticException("ContentProducer has been recycled"));
private final AutoLock _lock = new AutoLock();
private final HttpChannel _httpChannel;