Use static exceptions for closing websocket flushers and in ContentProducer (#8155)
* Use StaticException class in jetty-util for websocket flushers. * Use StaticException class for ContentProducer recycle and consumeAll Signed-off-by: Lachlan Roberts <lachlan@webtide.com> Signed-off-by: Ludovic Orban <lorban@bitronix.be> Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
b1c19c0b0f
commit
0699bc5326
|
@ -19,6 +19,7 @@ import java.util.concurrent.locks.Condition;
|
||||||
|
|
||||||
import org.eclipse.jetty.http.BadMessageException;
|
import org.eclipse.jetty.http.BadMessageException;
|
||||||
import org.eclipse.jetty.http.HttpStatus;
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.util.StaticException;
|
||||||
import org.eclipse.jetty.util.component.Destroyable;
|
import org.eclipse.jetty.util.component.Destroyable;
|
||||||
import org.eclipse.jetty.util.thread.AutoLock;
|
import org.eclipse.jetty.util.thread.AutoLock;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -31,15 +32,8 @@ import org.slf4j.LoggerFactory;
|
||||||
class AsyncContentProducer implements ContentProducer
|
class AsyncContentProducer implements ContentProducer
|
||||||
{
|
{
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
|
||||||
private static final HttpInput.ErrorContent RECYCLED_ERROR_CONTENT = new HttpInput.ErrorContent(new IllegalStateException("ContentProducer has been recycled"));
|
private static final HttpInput.ErrorContent RECYCLED_ERROR_CONTENT = new HttpInput.ErrorContent(new StaticException("ContentProducer has been recycled"));
|
||||||
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new IOException("Unconsumed content")
|
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new StaticException("Unconsumed content");
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Throwable fillInStackTrace()
|
|
||||||
{
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final AutoLock _lock = new AutoLock();
|
private final AutoLock _lock = new AutoLock();
|
||||||
private final HttpChannel _httpChannel;
|
private final HttpChannel _httpChannel;
|
||||||
|
@ -188,10 +182,10 @@ class AsyncContentProducer implements ContentProducer
|
||||||
{
|
{
|
||||||
assertLocked();
|
assertLocked();
|
||||||
Throwable x = UNCONSUMED_CONTENT_EXCEPTION;
|
Throwable x = UNCONSUMED_CONTENT_EXCEPTION;
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isTraceEnabled())
|
||||||
{
|
{
|
||||||
x = new IOException("Unconsumed content");
|
x = new StaticException("Unconsumed content", true);
|
||||||
LOG.debug("consumeAll {}", this, x);
|
LOG.trace("consumeAll {}", this, x);
|
||||||
}
|
}
|
||||||
failCurrentContent(x);
|
failCurrentContent(x);
|
||||||
// A specific HttpChannel mechanism must be used as the following code
|
// A specific HttpChannel mechanism must be used as the following code
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
//
|
||||||
|
// This program and the accompanying materials are made available under the
|
||||||
|
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||||
|
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||||
|
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception can safely be stored in a static variable as suppressed exceptions are disabled,
|
||||||
|
* meaning calling {@link #addSuppressed(Throwable)} has no effect.
|
||||||
|
* This prevents potential memory leaks where a statically-stored exception would accumulate
|
||||||
|
* suppressed exceptions added to them.
|
||||||
|
*/
|
||||||
|
public class StaticException extends Exception
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Create an instance with writable stack trace and suppression disabled.
|
||||||
|
*
|
||||||
|
* @param message – the detail message
|
||||||
|
*
|
||||||
|
* @see Throwable#Throwable(String, Throwable, boolean, boolean)
|
||||||
|
*/
|
||||||
|
public StaticException(String message)
|
||||||
|
{
|
||||||
|
this(message, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an instance with suppression disabled.
|
||||||
|
*
|
||||||
|
* @param message – the detail message
|
||||||
|
* @param writableStackTrace whether or not the stack trace should be writable
|
||||||
|
*
|
||||||
|
* @see Throwable#Throwable(String, Throwable, boolean, boolean)
|
||||||
|
*/
|
||||||
|
public StaticException(String message, boolean writableStackTrace)
|
||||||
|
{
|
||||||
|
// Make sure to call the super constructor that disables suppressed exception.
|
||||||
|
super(message, null, false, writableStackTrace);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import java.util.function.LongConsumer;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.CountingCallback;
|
import org.eclipse.jetty.util.CountingCallback;
|
||||||
import org.eclipse.jetty.util.IteratingCallback;
|
import org.eclipse.jetty.util.IteratingCallback;
|
||||||
|
import org.eclipse.jetty.util.StaticException;
|
||||||
import org.eclipse.jetty.websocket.core.Extension;
|
import org.eclipse.jetty.websocket.core.Extension;
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
import org.eclipse.jetty.websocket.core.IncomingFrames;
|
import org.eclipse.jetty.websocket.core.IncomingFrames;
|
||||||
|
@ -38,6 +39,8 @@ import org.eclipse.jetty.websocket.core.IncomingFrames;
|
||||||
*/
|
*/
|
||||||
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
|
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
|
||||||
{
|
{
|
||||||
|
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
|
||||||
|
|
||||||
private final IncomingFrames _emitFrame;
|
private final IncomingFrames _emitFrame;
|
||||||
private final AtomicLong _demand = new AtomicLong();
|
private final AtomicLong _demand = new AtomicLong();
|
||||||
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
|
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
|
||||||
|
@ -101,6 +104,18 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
|
||||||
succeeded();
|
succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to close this flusher when there is no explicit failure.
|
||||||
|
*/
|
||||||
|
public void closeFlusher()
|
||||||
|
{
|
||||||
|
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
|
||||||
|
{
|
||||||
|
failed(SENTINEL_CLOSE_EXCEPTION);
|
||||||
|
iterate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to fail this flusher possibly from an external event such as a callback.
|
* Used to fail this flusher possibly from an external event such as a callback.
|
||||||
* @param t the failure.
|
* @param t the failure.
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.IteratingCallback;
|
import org.eclipse.jetty.util.IteratingCallback;
|
||||||
|
import org.eclipse.jetty.util.StaticException;
|
||||||
import org.eclipse.jetty.util.TypeUtil;
|
import org.eclipse.jetty.util.TypeUtil;
|
||||||
import org.eclipse.jetty.util.thread.AutoLock;
|
import org.eclipse.jetty.util.thread.AutoLock;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
@ -44,6 +45,7 @@ public class FrameFlusher extends IteratingCallback
|
||||||
{
|
{
|
||||||
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
|
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
|
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
|
||||||
|
private static final Throwable CLOSED_CHANNEL = new StaticException("Closed");
|
||||||
|
|
||||||
private final AutoLock lock = new AutoLock();
|
private final AutoLock lock = new AutoLock();
|
||||||
private final LongAdder messagesOut = new LongAdder();
|
private final LongAdder messagesOut = new LongAdder();
|
||||||
|
@ -184,15 +186,7 @@ public class FrameFlusher extends IteratingCallback
|
||||||
{
|
{
|
||||||
try (AutoLock l = lock.lock())
|
try (AutoLock l = lock.lock())
|
||||||
{
|
{
|
||||||
// TODO: find a way to not create exception if cause is null.
|
closedCause = cause == null ? CLOSED_CHANNEL : cause;
|
||||||
closedCause = cause == null ? new ClosedChannelException()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Throwable fillInStackTrace()
|
|
||||||
{
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
} : cause;
|
|
||||||
}
|
}
|
||||||
iterate();
|
iterate();
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
package org.eclipse.jetty.websocket.core.internal;
|
package org.eclipse.jetty.websocket.core.internal;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -150,17 +149,8 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
|
incomingFlusher.closeFlusher();
|
||||||
ClosedChannelException exception = new ClosedChannelException()
|
outgoingFlusher.closeFlusher();
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Throwable fillInStackTrace()
|
|
||||||
{
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
incomingFlusher.failFlusher(exception);
|
|
||||||
outgoingFlusher.failFlusher(exception);
|
|
||||||
releaseInflater();
|
releaseInflater();
|
||||||
releaseDeflater();
|
releaseDeflater();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import java.util.Queue;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.IteratingCallback;
|
import org.eclipse.jetty.util.IteratingCallback;
|
||||||
|
import org.eclipse.jetty.util.StaticException;
|
||||||
import org.eclipse.jetty.util.thread.AutoLock;
|
import org.eclipse.jetty.util.thread.AutoLock;
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public abstract class TransformingFlusher
|
public abstract class TransformingFlusher
|
||||||
{
|
{
|
||||||
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
||||||
|
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
|
||||||
|
|
||||||
private final AutoLock lock = new AutoLock();
|
private final AutoLock lock = new AutoLock();
|
||||||
private final Queue<FrameEntry> entries = new ArrayDeque<>();
|
private final Queue<FrameEntry> entries = new ArrayDeque<>();
|
||||||
|
@ -77,13 +79,20 @@ public abstract class TransformingFlusher
|
||||||
notifyCallbackFailure(callback, failure);
|
notifyCallbackFailure(callback, failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to close this flusher when there is no explicit failure.
|
||||||
|
*/
|
||||||
|
public void closeFlusher()
|
||||||
|
{
|
||||||
|
failFlusher(SENTINEL_CLOSE_EXCEPTION);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to fail this flusher possibly from an external event such as a callback.
|
* Used to fail this flusher possibly from an external event such as a callback.
|
||||||
* @param t the failure.
|
* @param t the failure.
|
||||||
*/
|
*/
|
||||||
public void failFlusher(Throwable t)
|
public void failFlusher(Throwable t)
|
||||||
{
|
{
|
||||||
// TODO: find a way to close the flusher in non error case without exception.
|
|
||||||
boolean failed = false;
|
boolean failed = false;
|
||||||
try (AutoLock l = lock.lock())
|
try (AutoLock l = lock.lock())
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue