Merge branch 'jetty-10.0.x' into jetty-10.0.x-10135-websocketFlush
This commit is contained in:
commit
74d136076a
|
@ -109,7 +109,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
case OSHUTTING:
|
||||
if (!writeState.compareAndSet(current, WriteState.OSHUT))
|
||||
break;
|
||||
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.from(this::oshutSuccess, this::oshutFailure));
|
||||
Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure);
|
||||
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback);
|
||||
return;
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, WriteState.OSHUTTING))
|
||||
|
@ -177,7 +178,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
if (closed.compareAndSet(false, true))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("closing {}, cause: {}", this, cause);
|
||||
LOG.debug("closing {}", this, cause);
|
||||
shutdownOutput();
|
||||
stream.close();
|
||||
onClose(cause);
|
||||
|
@ -188,7 +189,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
public int fill(ByteBuffer sink) throws IOException
|
||||
{
|
||||
Entry entry;
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
entry = dataQueue.poll();
|
||||
}
|
||||
|
@ -222,7 +223,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
|
||||
if (source.hasRemaining())
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
dataQueue.offerFirst(entry);
|
||||
}
|
||||
|
@ -248,34 +249,22 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this);
|
||||
if (buffers == null || buffers.length == 0)
|
||||
{
|
||||
if (buffers == null || buffers.length == 0 || remaining(buffers) == 0)
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
||||
// Differently from other EndPoint implementations, where write() calls flush(),
|
||||
// in this implementation all the work is done in write(), and flush() is mostly
|
||||
// a no-operation.
|
||||
// This is because the flush() semantic is that it must not leave pending
|
||||
// operations if it cannot write the buffers; therefore we cannot call
|
||||
// stream.data() from flush() because if the stream is congested, the buffers
|
||||
// would not be fully written, we would return false from flush(), but
|
||||
// stream.data() would remain as a pending operation.
|
||||
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
if (!writeState.compareAndSet(current, WriteState.PENDING))
|
||||
break;
|
||||
// We must copy the buffers because, differently from
|
||||
// write(), the semantic of flush() is that it does not
|
||||
// own them, but stream.data() needs to own them.
|
||||
ByteBuffer buffer = coalesce(buffers, true);
|
||||
Callback.Completable callback = new Callback.Completable(Invocable.InvocationType.NON_BLOCKING);
|
||||
stream.data(new DataFrame(stream.getId(), buffer, false), callback);
|
||||
callback.whenComplete((nothing, failure) ->
|
||||
{
|
||||
if (failure == null)
|
||||
flushSuccess();
|
||||
else
|
||||
flushFailure(failure);
|
||||
});
|
||||
return callback.isDone();
|
||||
case PENDING:
|
||||
return false;
|
||||
case OSHUTTING:
|
||||
|
@ -286,54 +275,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
if (failure instanceof IOException)
|
||||
throw (IOException)failure;
|
||||
throw new IOException(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flushSuccess()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
case OSHUT:
|
||||
throw new IllegalStateException();
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, WriteState.IDLE))
|
||||
break;
|
||||
return;
|
||||
case OSHUTTING:
|
||||
shutdownOutput();
|
||||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flushFailure(Throwable failure)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
WriteState current = writeState.get();
|
||||
switch (current.state)
|
||||
{
|
||||
case IDLE:
|
||||
case OSHUT:
|
||||
throw new IllegalStateException();
|
||||
case PENDING:
|
||||
if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure)))
|
||||
break;
|
||||
return;
|
||||
case OSHUTTING:
|
||||
shutdownOutput();
|
||||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
}
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected state: " + current.state);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,8 +340,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
if (!writeState.compareAndSet(current, WriteState.PENDING))
|
||||
break;
|
||||
// TODO: we really need a Stream primitive to write multiple frames.
|
||||
ByteBuffer result = coalesce(buffers, false);
|
||||
stream.data(new DataFrame(stream.getId(), result, false), Callback.from(() -> writeSuccess(callback), x -> writeFailure(x, callback)));
|
||||
ByteBuffer result = coalesce(buffers);
|
||||
Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback));
|
||||
stream.data(new DataFrame(stream.getId(), result, false), dataCallback);
|
||||
return;
|
||||
case PENDING:
|
||||
callback.failed(new WritePendingException());
|
||||
|
@ -410,6 +354,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
case FAILED:
|
||||
callback.failed(current.failure);
|
||||
return;
|
||||
default:
|
||||
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -438,6 +385,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
case FAILED:
|
||||
callback.failed(current.failure);
|
||||
return;
|
||||
default:
|
||||
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -461,23 +411,21 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
return;
|
||||
case FAILED:
|
||||
return;
|
||||
default:
|
||||
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long remaining(ByteBuffer... buffers)
|
||||
{
|
||||
long total = 0;
|
||||
for (ByteBuffer buffer : buffers)
|
||||
{
|
||||
total += buffer.remaining();
|
||||
}
|
||||
return total;
|
||||
return BufferUtil.remaining(buffers);
|
||||
}
|
||||
|
||||
private ByteBuffer coalesce(ByteBuffer[] buffers, boolean forceCopy)
|
||||
private ByteBuffer coalesce(ByteBuffer[] buffers)
|
||||
{
|
||||
if (buffers.length == 1 && !forceCopy)
|
||||
if (buffers.length == 1)
|
||||
return buffers[0];
|
||||
long capacity = remaining(buffers);
|
||||
if (capacity > Integer.MAX_VALUE)
|
||||
|
@ -567,7 +515,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
|
||||
private void offer(ByteBuffer buffer, Callback callback, Throwable failure)
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
dataQueue.offer(new Entry(buffer, callback, failure));
|
||||
}
|
||||
|
@ -576,7 +524,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
|
|||
protected void process()
|
||||
{
|
||||
boolean empty;
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
empty = dataQueue.isEmpty();
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, A
|
|||
private CloseState closeState = CloseState.NOT_CLOSED;
|
||||
private FrameState frameState = FrameState.INITIAL;
|
||||
private long idleTimeout;
|
||||
private long expireNanoTime;
|
||||
private long expireNanoTime = Long.MAX_VALUE;
|
||||
private Object attachment;
|
||||
|
||||
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
|
||||
|
|
|
@ -47,7 +47,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CyclicTimeouts.class);
|
||||
|
||||
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);
|
||||
private final AtomicLong earliestNanoTime = new AtomicLong(Long.MAX_VALUE);
|
||||
private final CyclicTimeout cyclicTimeout;
|
||||
|
||||
public CyclicTimeouts(Scheduler scheduler)
|
||||
|
@ -82,7 +82,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
|||
// A concurrent call to schedule(long) may lose an
|
||||
// earliest value, but the corresponding entity will
|
||||
// be seen during the iteration below.
|
||||
earliestTimeout.set(earliest);
|
||||
earliestNanoTime.set(earliest);
|
||||
|
||||
Iterator<T> iterator = iterator();
|
||||
if (iterator == null)
|
||||
|
@ -95,12 +95,16 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
|||
T expirable = iterator.next();
|
||||
long expiresAt = expirable.getExpireNanoTime();
|
||||
|
||||
if (expiresAt == Long.MAX_VALUE)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Entity {} does not expire for {}", expirable, this);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Entity {} expires in {} ms for {}", expirable, NanoTime.millisElapsed(now, expiresAt), this);
|
||||
|
||||
if (expiresAt == -1)
|
||||
continue;
|
||||
|
||||
if (NanoTime.isBeforeOrSame(expiresAt, now))
|
||||
{
|
||||
boolean remove = onExpired(expirable);
|
||||
|
@ -135,7 +139,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
|||
// Schedule a timeout for the earliest entity that may expire.
|
||||
// When the timeout expires, scan the entities for the next
|
||||
// earliest entity that may expire, and reschedule a new timeout.
|
||||
long prevEarliest = earliestTimeout.getAndUpdate(t -> NanoTime.isBefore(t, expiresAt) ? t : expiresAt);
|
||||
long prevEarliest = earliestNanoTime.getAndUpdate(t -> NanoTime.isBefore(t, expiresAt) ? t : expiresAt);
|
||||
long expires = expiresAt;
|
||||
while (NanoTime.isBefore(expires, prevEarliest))
|
||||
{
|
||||
|
@ -148,7 +152,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
|
|||
// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
|
||||
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
|
||||
prevEarliest = expires;
|
||||
expires = earliestTimeout.get();
|
||||
expires = earliestNanoTime.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -277,7 +277,7 @@ public abstract class WriteFlusher
|
|||
if (buffers != null)
|
||||
{
|
||||
if (DEBUG)
|
||||
LOG.debug("flushed incomplete");
|
||||
LOG.debug("flush incomplete {}", this);
|
||||
PendingState pending = new PendingState(callback, address, buffers);
|
||||
if (updateState(__WRITING, pending))
|
||||
onIncompleteFlush();
|
||||
|
|
|
@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class CyclicTimeoutsTest
|
||||
{
|
||||
private Scheduler scheduler;
|
||||
private CyclicTimeouts<ConstantExpirable> timeouts;
|
||||
private CyclicTimeouts<CyclicTimeouts.Expirable> timeouts;
|
||||
|
||||
@BeforeEach
|
||||
public void prepare()
|
||||
|
@ -65,14 +65,14 @@ public class CyclicTimeoutsTest
|
|||
timeouts = new CyclicTimeouts<>(scheduler)
|
||||
{
|
||||
@Override
|
||||
protected Iterator<ConstantExpirable> iterator()
|
||||
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||
{
|
||||
latch.countDown();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onExpired(ConstantExpirable expirable)
|
||||
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -84,6 +84,47 @@ public class CyclicTimeoutsTest
|
|||
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpirableEntityBecomesNonExpirable() throws Exception
|
||||
{
|
||||
long timeout = 1000;
|
||||
DynamicExpirable entity = new DynamicExpirable(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(timeout));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
timeouts = new CyclicTimeouts<>(scheduler)
|
||||
{
|
||||
@Override
|
||||
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||
{
|
||||
entity.expireNanoTime = Long.MAX_VALUE;
|
||||
return List.<Expirable>of(entity).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
|
||||
{
|
||||
if (unit.toMillis(delay) > 2 * timeout)
|
||||
latch.countDown();
|
||||
return super.schedule(cyclicTimeout, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||
{
|
||||
latch.countDown();
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
timeouts.schedule(entity);
|
||||
|
||||
// Wait until the timeouts check.
|
||||
Thread.sleep(timeout);
|
||||
|
||||
// Since the expireNanoTime was changed to Long.MAX_VALUE,
|
||||
// the entity must not have been scheduled nor expired.
|
||||
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduleZero() throws Exception
|
||||
{
|
||||
|
@ -93,14 +134,14 @@ public class CyclicTimeoutsTest
|
|||
timeouts = new CyclicTimeouts<>(scheduler)
|
||||
{
|
||||
@Override
|
||||
protected Iterator<ConstantExpirable> iterator()
|
||||
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||
{
|
||||
iteratorLatch.countDown();
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onExpired(ConstantExpirable expirable)
|
||||
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||
{
|
||||
expiredLatch.countDown();
|
||||
return false;
|
||||
|
@ -119,21 +160,21 @@ public class CyclicTimeoutsTest
|
|||
{
|
||||
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
|
||||
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
|
||||
Collection<ConstantExpirable> collection = new ArrayList<>();
|
||||
Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
|
||||
collection.add(one);
|
||||
AtomicInteger iterations = new AtomicInteger();
|
||||
CountDownLatch expiredLatch = new CountDownLatch(1);
|
||||
timeouts = new CyclicTimeouts<>(scheduler)
|
||||
{
|
||||
@Override
|
||||
protected Iterator<ConstantExpirable> iterator()
|
||||
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||
{
|
||||
iterations.incrementAndGet();
|
||||
return collection.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onExpired(ConstantExpirable expirable)
|
||||
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||
{
|
||||
assertSame(one, expirable);
|
||||
expiredLatch.countDown();
|
||||
|
@ -169,22 +210,22 @@ public class CyclicTimeoutsTest
|
|||
long delayMs = 2000;
|
||||
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
|
||||
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
|
||||
Collection<ConstantExpirable> collection = new ArrayList<>();
|
||||
Collection<CyclicTimeouts.Expirable> collection = new ArrayList<>();
|
||||
collection.add(two);
|
||||
CountDownLatch expiredLatch = new CountDownLatch(2);
|
||||
List<ConstantExpirable> expired = new ArrayList<>();
|
||||
List<CyclicTimeouts.Expirable> expired = new ArrayList<>();
|
||||
timeouts = new CyclicTimeouts<>(scheduler)
|
||||
{
|
||||
private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
protected Iterator<ConstantExpirable> iterator()
|
||||
protected Iterator<CyclicTimeouts.Expirable> iterator()
|
||||
{
|
||||
return collection.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onExpired(ConstantExpirable expirable)
|
||||
protected boolean onExpired(CyclicTimeouts.Expirable expirable)
|
||||
{
|
||||
expired.add(expirable);
|
||||
expiredLatch.countDown();
|
||||
|
@ -259,4 +300,26 @@ public class CyclicTimeoutsTest
|
|||
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
|
||||
}
|
||||
}
|
||||
|
||||
private static class DynamicExpirable implements CyclicTimeouts.Expirable
|
||||
{
|
||||
private long expireNanoTime;
|
||||
|
||||
public DynamicExpirable(long expireNanoTime)
|
||||
{
|
||||
this.expireNanoTime = expireNanoTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getExpireNanoTime()
|
||||
{
|
||||
return expireNanoTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[%dms]", getClass().getSimpleName(), hashCode(), NanoTime.millisUntil(expireNanoTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
# Jetty Logging using jetty-slf4j-impl
|
||||
#org.eclipse.jetty.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable
|
||||
{
|
||||
private final Connector connector;
|
||||
private long expireNanoTime;
|
||||
private long expireNanoTime = Long.MAX_VALUE;
|
||||
|
||||
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
|
||||
{
|
||||
|
|
|
@ -119,29 +119,64 @@ public class XmlConfiguration
|
|||
Class<?> t2 = p2[i].getType();
|
||||
if (t1 != t2)
|
||||
{
|
||||
// Favour derived type over base type
|
||||
compare = Boolean.compare(t1.isAssignableFrom(t2), t2.isAssignableFrom(t1));
|
||||
// prefer primitives
|
||||
compare = Boolean.compare(t2.isPrimitive(), t1.isPrimitive());
|
||||
if (compare == 0)
|
||||
{
|
||||
// favour primitive type over reference
|
||||
compare = Boolean.compare(!t1.isPrimitive(), !t2.isPrimitive());
|
||||
if (compare == 0)
|
||||
// Use name to avoid non determinant sorting
|
||||
compare = t1.getName().compareTo(t2.getName());
|
||||
}
|
||||
// prefer interfaces
|
||||
compare = Boolean.compare(t2.isInterface(), t1.isInterface());
|
||||
|
||||
// break on the first different parameter (should always be true)
|
||||
if (compare == 0)
|
||||
{
|
||||
// prefer most derived
|
||||
int d1 = calculateDepth(t1);
|
||||
int d2 = calculateDepth(t2);
|
||||
compare = Integer.compare(d2, d1);
|
||||
}
|
||||
}
|
||||
}
|
||||
// break on the first different parameter
|
||||
if (compare != 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
compare = Math.min(1, Math.max(compare, -1));
|
||||
}
|
||||
|
||||
return compare;
|
||||
// failsafe is to compare on the generic string
|
||||
if (compare == 0)
|
||||
compare = e1.toGenericString().compareTo(e2.toGenericString());
|
||||
|
||||
// Return normalized to -1, 0, 1
|
||||
return Integer.compare(compare, 0);
|
||||
};
|
||||
|
||||
private static int calculateDepth(Class<?> c)
|
||||
{
|
||||
int depth = 0;
|
||||
|
||||
if (c.isPrimitive())
|
||||
return Integer.MIN_VALUE;
|
||||
|
||||
if (c.isInterface())
|
||||
{
|
||||
Set<Class<?>> interfaces = Set.of(c);
|
||||
while (!interfaces.isEmpty())
|
||||
{
|
||||
depth++;
|
||||
interfaces = interfaces.stream().flatMap(i -> Arrays.stream(i.getInterfaces())).collect(Collectors.toSet());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
while (c != Object.class && !c.isPrimitive())
|
||||
{
|
||||
depth++;
|
||||
c = c.getSuperclass();
|
||||
}
|
||||
}
|
||||
return depth;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the standard IDs and properties expected in a jetty XML file:
|
||||
* <ul>
|
||||
|
@ -932,9 +967,11 @@ public class XmlConfiguration
|
|||
throw new IllegalArgumentException("Method name cannot be blank");
|
||||
|
||||
// Lets just try all methods for now
|
||||
Method[] methods = Arrays.stream(oClass.getMethods())
|
||||
.filter(m -> m.getName().equals(methodName))
|
||||
.sorted(EXECUTABLE_COMPARATOR)
|
||||
.toArray(Method[]::new);
|
||||
|
||||
Method[] methods = oClass.getMethods();
|
||||
Arrays.sort(methods, EXECUTABLE_COMPARATOR);
|
||||
for (Method method : methods)
|
||||
{
|
||||
if (!method.getName().equals(methodName))
|
||||
|
|
|
@ -17,17 +17,22 @@ import java.io.BufferedWriter;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.lang.reflect.Executable;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -59,8 +64,10 @@ import org.xml.sax.InputSource;
|
|||
import org.xml.sax.SAXException;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.eclipse.jetty.xml.XmlConfiguration.EXECUTABLE_COMPARATOR;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -728,6 +735,22 @@ public class XmlConfigurationTest
|
|||
{
|
||||
}
|
||||
|
||||
public void call(Aaa aaa)
|
||||
{
|
||||
}
|
||||
|
||||
public void call(Bbb aaa)
|
||||
{
|
||||
}
|
||||
|
||||
public void call(Ccc aaa)
|
||||
{
|
||||
}
|
||||
|
||||
public void call(Abc abc)
|
||||
{
|
||||
}
|
||||
|
||||
public void call(Object o)
|
||||
{
|
||||
}
|
||||
|
@ -745,16 +768,20 @@ public class XmlConfigurationTest
|
|||
}
|
||||
}
|
||||
|
||||
@RepeatedTest(10)
|
||||
@RepeatedTest(100)
|
||||
public void testMethodOrdering() throws Exception
|
||||
{
|
||||
List<Method> methods = Arrays.stream(TestOrder.class.getMethods()).filter(m -> "call".equals(m.getName())).collect(Collectors.toList());
|
||||
Collections.shuffle(methods);
|
||||
methods.sort(XmlConfiguration.EXECUTABLE_COMPARATOR);
|
||||
methods.sort(EXECUTABLE_COMPARATOR);
|
||||
assertThat(methods, Matchers.contains(
|
||||
TestOrder.class.getMethod("call"),
|
||||
TestOrder.class.getMethod("call", int.class),
|
||||
TestOrder.class.getMethod("call", Abc.class),
|
||||
TestOrder.class.getMethod("call", Aaa.class),
|
||||
TestOrder.class.getMethod("call", String.class),
|
||||
TestOrder.class.getMethod("call", Bbb.class),
|
||||
TestOrder.class.getMethod("call", Ccc.class),
|
||||
TestOrder.class.getMethod("call", Object.class),
|
||||
TestOrder.class.getMethod("call", String[].class),
|
||||
TestOrder.class.getMethod("call", String.class, String[].class)
|
||||
|
@ -1927,4 +1954,129 @@ public class XmlConfigurationTest
|
|||
{
|
||||
void run() throws Exception;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecutableComparator() throws Throwable
|
||||
{
|
||||
aaa(null);
|
||||
bbb(null);
|
||||
ccc(null);
|
||||
|
||||
Stream.of(XmlConfigurationTest.class.getMethods())
|
||||
.filter(m -> m.getName().length() == 3)
|
||||
.filter(m -> !m.getName().equals("foo"))
|
||||
.sorted(EXECUTABLE_COMPARATOR)
|
||||
.map(Executable::toGenericString)
|
||||
.forEach(System.out::println);
|
||||
|
||||
List<Method> methods = Arrays.asList(Arrays.stream(XmlConfigurationTest.class.getMethods())
|
||||
.filter(m -> m.getName().length() == 3)
|
||||
.toArray(Method[]::new));
|
||||
|
||||
// The implementor must also ensure that the relation is transitive: ((compare(x, y)>0) && (compare(y, z)>0)) implies compare(x, z)>0
|
||||
assertThat(EXECUTABLE_COMPARATOR.compare(methods.get(0), methods.get(1)), is(EXECUTABLE_COMPARATOR.compare(methods.get(1), methods.get(2))));
|
||||
assertThat(EXECUTABLE_COMPARATOR.compare(methods.get(0), methods.get(1)), is(EXECUTABLE_COMPARATOR.compare(methods.get(0), methods.get(2))));
|
||||
}
|
||||
|
||||
public void aaa(Aaa ignored)
|
||||
{
|
||||
}
|
||||
|
||||
public void bbb(Bbb ignored)
|
||||
{
|
||||
}
|
||||
|
||||
public void ccc(Ccc ignored)
|
||||
{
|
||||
}
|
||||
|
||||
public interface Aaa
|
||||
{
|
||||
}
|
||||
|
||||
public interface Abc extends Aaa
|
||||
{
|
||||
}
|
||||
|
||||
public static class Bbb
|
||||
{
|
||||
}
|
||||
|
||||
public static class Ccc implements Aaa
|
||||
{
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFooExecutableComparator()
|
||||
{
|
||||
List<String> orderedMethodIds = Stream.of(FooObj.class.getMethods())
|
||||
.filter(m -> m.getName().equals("foo"))
|
||||
.sorted(EXECUTABLE_COMPARATOR)
|
||||
.map(Executable::toGenericString)
|
||||
.collect(Collectors.toList());
|
||||
orderedMethodIds.forEach(System.out::println);
|
||||
String[] expectedOrder = {
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo()", // favor fewer args
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(int)", // favor primitives over non-primitives
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.time.temporal.Temporal)", // favor over Instant
|
||||
"public int org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.lang.String)",
|
||||
"public java.util.Locale org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.nio.charset.Charset)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.time.Instant)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.util.Locale)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(int,java.lang.String)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(java.lang.String,int)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(int,java.lang.String,java.lang.String)",
|
||||
"public void org.eclipse.jetty.xml.XmlConfigurationTest$FooObj.foo(int,java.lang.String,java.lang.String,java.lang.Object)"
|
||||
};
|
||||
assertThat(orderedMethodIds, contains(expectedOrder));
|
||||
}
|
||||
|
||||
public static class FooObj
|
||||
{
|
||||
public void foo()
|
||||
{
|
||||
}
|
||||
|
||||
public int foo(String name)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
public void foo(int id)
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(Locale locale)
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(Instant timestamp) // Instant extends from Temporal
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(Temporal temporal)
|
||||
{
|
||||
}
|
||||
|
||||
public Locale foo(Charset charset)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
public void foo(String name, int id)
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(int id, String name)
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(int id, String name, String description)
|
||||
{
|
||||
}
|
||||
|
||||
public void foo(int id, String name, String description, Object value)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -23,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
|
@ -36,7 +38,9 @@ import org.eclipse.jetty.client.api.ContentResponse;
|
|||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
|
||||
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
|
||||
import org.eclipse.jetty.client.util.ByteBufferRequestContent;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -73,6 +77,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -288,6 +293,58 @@ public class ProxyWithDynamicTransportTest
|
|||
assertEquals(1, connectionPool.getConnectionCount());
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}")
|
||||
@MethodSource("testParams")
|
||||
public void testProxyConcurrentLoad(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
});
|
||||
|
||||
int parallelism = 8;
|
||||
boolean proxyMultiplexed = proxyProtocol.getProtocols().stream().allMatch(p -> p.startsWith("h2"));
|
||||
client.setMaxConnectionsPerDestination(proxyMultiplexed ? 1 : parallelism);
|
||||
|
||||
int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort();
|
||||
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
|
||||
HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol);
|
||||
client.getProxyConfiguration().addProxy(proxy);
|
||||
|
||||
String scheme = serverSecure ? "https" : "http";
|
||||
int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort();
|
||||
int contentLength = 128 * 1024;
|
||||
|
||||
int iterations = 16;
|
||||
IntStream.range(0, parallelism).parallel().forEach(p ->
|
||||
IntStream.range(0, iterations).forEach(i ->
|
||||
{
|
||||
try
|
||||
{
|
||||
String id = p + "-" + i;
|
||||
ContentResponse response = client.newRequest("localhost", serverPort)
|
||||
.scheme(scheme)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/path/" + id)
|
||||
.version(serverProtocol)
|
||||
.body(new ByteBufferRequestContent(ByteBuffer.allocate(contentLength)))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
assertEquals(contentLength, response.getContent().length);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTTP2TunnelClosedByClient() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue